Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove extra space for string value for postgresql datasource #65

Merged
merged 1 commit into from
Jan 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,28 @@ trait Processor extends Serializable {

if (row.isNullAt(index)) return null

val value = row.get(index).toString.trim
PropertyType.findByValue(fieldTypeMap(field)) match {
case PropertyType.STRING | PropertyType.FIXED_STRING => {
var value = row.get(index).toString
var stringValue = value
if (value.equals(DEFAULT_EMPTY_VALUE)) {
value = ""
stringValue = ""
}
val result = NebulaUtils.escapeUtil(value).mkString("\"", "", "\"")
val result = NebulaUtils.escapeUtil(stringValue).mkString("\"", "", "\"")
result
}
case PropertyType.DATE => "date(\"" + row.get(index) + "\")"
case PropertyType.DATETIME => "datetime(\"" + row.get(index) + "\")"
case PropertyType.TIME => "time(\"" + row.get(index) + "\")"
case PropertyType.DATE => "date(\"" + value + "\")"
case PropertyType.DATETIME => "datetime(\"" + value + "\")"
case PropertyType.TIME => "time(\"" + value + "\")"
case PropertyType.TIMESTAMP => {
val value = row.get(index).toString
if (NebulaUtils.isNumic(value)) {
value
} else {
"timestamp(\"" + row.get(index) + "\")"
"timestamp(\"" + value + "\")"
}
}
case PropertyType.GEOGRAPHY => "ST_GeogFromText(\"" + row.get(index) + "\")"
case _ => row.get(index)
case PropertyType.GEOGRAPHY => "ST_GeogFromText(\"" + value + "\")"
case _ => value
}
}

Expand All @@ -98,18 +98,18 @@ trait Processor extends Serializable {
case PropertyType.UNKNOWN =>
throw new IllegalArgumentException("date type in nebula is UNKNOWN.")
case PropertyType.STRING | PropertyType.FIXED_STRING => {
val value = row.get(index).toString
val value = row.get(index).toString.trim
if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value
}
case PropertyType.BOOL => row.get(index).toString.toBoolean
case PropertyType.DOUBLE => row.get(index).toString.toDouble
case PropertyType.FLOAT => row.get(index).toString.toFloat
case PropertyType.INT8 => row.get(index).toString.toByte
case PropertyType.INT16 => row.get(index).toString.toShort
case PropertyType.INT32 => row.get(index).toString.toInt
case PropertyType.INT64 | PropertyType.VID => row.get(index).toString.toLong
case PropertyType.BOOL => row.get(index).toString.trim.toBoolean
case PropertyType.DOUBLE => row.get(index).toString.trim.toDouble
case PropertyType.FLOAT => row.get(index).toString.trim.toFloat
case PropertyType.INT8 => row.get(index).toString.trim.toByte
case PropertyType.INT16 => row.get(index).toString.trim.toShort
case PropertyType.INT32 => row.get(index).toString.trim.toInt
case PropertyType.INT64 | PropertyType.VID => row.get(index).toString.trim.toLong
case PropertyType.TIME => {
val values = row.get(index).toString.split(":")
val values = row.get(index).toString.trim.split(":")
if (values.size < 3) {
throw new UnsupportedOperationException(
s"wrong format for time value: ${row.get(index)}, correct format is 12:00:00:0000")
Expand All @@ -120,15 +120,15 @@ trait Processor extends Serializable {
new Time(values(0).toByte, values(1).toByte, sec, microSec)
}
case PropertyType.DATE => {
val values = row.get(index).toString.split("-")
val values = row.get(index).toString.trim.split("-")
if (values.size < 3) {
throw new UnsupportedOperationException(
s"wrong format for date value: ${row.get(index)}, correct format is 2020-01-01")
}
new Date(values(0).toShort, values(1).toByte, values(2).toByte)
}
case PropertyType.DATETIME => {
val rowValue = row.get(index).toString
val rowValue = row.get(index).toString.trim
var dateTimeValue: Array[String] = null
if (rowValue.contains("T")) {
dateTimeValue = rowValue.split("T")
Expand All @@ -140,7 +140,7 @@ trait Processor extends Serializable {
s"correct format is 2020-01-01T12:00:00.0000 or 2020-01-01 12:00:00.0000")
}

if (dateTimeValue.size < 2) {
if (dateTimeValue.length < 2) {
throw new UnsupportedOperationException(
s"wrong format for datetime value: $rowValue, " +
s"correct format is 2020-01-01T12:00:00.0000 or 2020-01-01 12:00:00.0000")
Expand All @@ -167,15 +167,15 @@ trait Processor extends Serializable {
microsec)
}
case PropertyType.TIMESTAMP => {
val value = row.get(index).toString
val value = row.get(index).toString.trim
if (!NebulaUtils.isNumic(value)) {
throw new IllegalArgumentException(
s"timestamp only support long type, your value is ${value}")
}
row.get(index).toString.toLong
value.toLong
}
case PropertyType.GEOGRAPHY => {
val wkt = row.get(index).toString
val wkt = row.get(index).toString.trim
val jtsGeom = new org.locationtech.jts.io.WKTReader().read(wkt)
convertJTSGeometryToGeography(jtsGeom)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class EdgeProcessor(spark: SparkSession,
if (index < 0 || row.isNullAt(index)) {
printChoice(streamFlag, s"rank must exist and cannot be null, your row data is $row")
}
val ranking = row.get(index).toString
val ranking = row.get(index).toString.trim
if (!NebulaUtils.isNumic(ranking)) {
printChoice(streamFlag,
s"Not support non-Numeric type for ranking field.your row data is $row")
Expand Down Expand Up @@ -221,7 +221,7 @@ class EdgeProcessor(spark: SparkSession,
if (index < 0 || row.isNullAt(index)) {
printChoice(streamFlag, s"$fieldType must exist and cannot be null, your row data is $row")
None
} else Some(row.get(index).toString)
} else Some(row.get(index).toString.trim)
}

val idFlag = fieldValue.isDefined
Expand Down Expand Up @@ -267,7 +267,7 @@ class EdgeProcessor(spark: SparkSession,

if (edgeConfig.rankingField.isDefined) {
val index = row.schema.fieldIndex(edgeConfig.rankingField.get)
val ranking = row.get(index).toString
val ranking = row.get(index).toString.trim
Edge(sourceField, targetField, Some(ranking.toLong), values)
} else {
Edge(sourceField, targetField, None, values)
Expand All @@ -288,7 +288,7 @@ class EdgeProcessor(spark: SparkSession,
indexCells(lat, lng).mkString(",")
} else {
val index = row.schema.fieldIndex(field)
val value = row.get(index).toString
val value = row.get(index).toString.trim
if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value
}
// process string type vid
Expand All @@ -310,11 +310,11 @@ class EdgeProcessor(spark: SparkSession,
isEdgeValid(row, edgeConfig, false, vidType == VidType.STRING)

val srcIndex: Int = row.schema.fieldIndex(edgeConfig.sourceField)
var srcId: String = row.get(srcIndex).toString
var srcId: String = row.get(srcIndex).toString.trim
if (srcId.equals(DEFAULT_EMPTY_VALUE)) { srcId = "" }

val dstIndex: Int = row.schema.fieldIndex(edgeConfig.targetField)
var dstId: String = row.get(dstIndex).toString
var dstId: String = row.get(dstIndex).toString.trim
if (dstId.equals(DEFAULT_EMPTY_VALUE)) { dstId = "" }

if (edgeConfig.sourcePolicy.isDefined) {
Expand Down Expand Up @@ -344,7 +344,7 @@ class EdgeProcessor(spark: SparkSession,

val ranking: Long = if (edgeConfig.rankingField.isDefined) {
val rankIndex = row.schema.fieldIndex(edgeConfig.rankingField.get)
row.get(rankIndex).toString.toLong
row.get(rankIndex).toString.trim.toLong
} else {
0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ class VerticesProcessor(spark: SparkSession,
return false
}

val vertexId = row.get(index).toString
val vertexId = row.get(index).toString.trim
// process int type vid
if (tagConfig.vertexPolicy.isEmpty && !isVidStringType && !NebulaUtils.isNumic(vertexId)) {
printChoice(
Expand All @@ -203,7 +203,7 @@ class VerticesProcessor(spark: SparkSession,
fieldKeys: List[String],
fieldTypeMap: Map[String, Int]): Vertex = {
val index = row.schema.fieldIndex(tagConfig.vertexField)
var vertexId = row.get(index).toString
var vertexId = row.get(index).toString.trim
if (vertexId.equals(DEFAULT_EMPTY_VALUE)) {
vertexId = ""
}
Expand Down Expand Up @@ -231,7 +231,7 @@ class VerticesProcessor(spark: SparkSession,
isVertexValid(row, tagConfig, false, vidType == VidType.STRING)

val index: Int = row.schema.fieldIndex(tagConfig.vertexField)
var vertexId: String = row.get(index).toString
var vertexId: String = row.get(index).toString.trim
if (vertexId.equals(DEFAULT_EMPTY_VALUE)) {
vertexId = ""
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ class EdgeProcessor(spark: SparkSession,
if (index < 0 || row.isNullAt(index)) {
printChoice(streamFlag, s"rank must exist and cannot be null, your row data is $row")
}
val ranking = row.get(index).toString
val ranking = row.get(index).toString.trim
if (!NebulaUtils.isNumic(ranking)) {
printChoice(streamFlag,
s"Not support non-Numeric type for ranking field.your row data is $row")
Expand Down Expand Up @@ -240,7 +240,7 @@ class EdgeProcessor(spark: SparkSession,
if (index < 0 || row.isNullAt(index)) {
printChoice(streamFlag, s"$fieldType must exist and cannot be null, your row data is $row")
None
} else Some(row.get(index).toString)
} else Some(row.get(index).toString.trim)
}

val idFlag = fieldValue.isDefined
Expand Down Expand Up @@ -286,7 +286,7 @@ class EdgeProcessor(spark: SparkSession,

if (edgeConfig.rankingField.isDefined) {
val index = row.schema.fieldIndex(edgeConfig.rankingField.get)
val ranking = row.get(index).toString
val ranking = row.get(index).toString.trim
Edge(sourceField, targetField, Some(ranking.toLong), values)
} else {
Edge(sourceField, targetField, None, values)
Expand All @@ -307,7 +307,7 @@ class EdgeProcessor(spark: SparkSession,
indexCells(lat, lng).mkString(",")
} else {
val index = row.schema.fieldIndex(field)
val value = row.get(index).toString
val value = row.get(index).toString.trim
if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value
}
// process string type vid
Expand All @@ -329,13 +329,13 @@ class EdgeProcessor(spark: SparkSession,
isEdgeValid(row, edgeConfig, false, vidType == VidType.STRING)

val srcIndex: Int = row.schema.fieldIndex(edgeConfig.sourceField)
var srcId: String = row.get(srcIndex).toString
var srcId: String = row.get(srcIndex).toString.trim
if (srcId.equals(DEFAULT_EMPTY_VALUE)) {
srcId = ""
}

val dstIndex: Int = row.schema.fieldIndex(edgeConfig.targetField)
var dstId: String = row.get(dstIndex).toString
var dstId: String = row.get(dstIndex).toString.trim
if (dstId.equals(DEFAULT_EMPTY_VALUE)) {
dstId = ""
}
Expand Down Expand Up @@ -367,7 +367,7 @@ class EdgeProcessor(spark: SparkSession,

val ranking: Long = if (edgeConfig.rankingField.isDefined) {
val rankIndex = row.schema.fieldIndex(edgeConfig.rankingField.get)
row.get(rankIndex).toString.toLong
row.get(rankIndex).toString.trim.toLong
} else {
0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ class VerticesProcessor(spark: SparkSession,
return false
}

val vertexId = row.get(index).toString
val vertexId = row.get(index).toString.trim
// process int type vid
if (tagConfig.vertexPolicy.isEmpty && !isVidStringType && !NebulaUtils.isNumic(vertexId)) {
printChoice(
Expand All @@ -222,7 +222,7 @@ class VerticesProcessor(spark: SparkSession,
fieldKeys: List[String],
fieldTypeMap: Map[String, Int]): Vertex = {
val index = row.schema.fieldIndex(tagConfig.vertexField)
var vertexId = row.get(index).toString
var vertexId = row.get(index).toString.trim
if (vertexId.equals(DEFAULT_EMPTY_VALUE)) {
vertexId = ""
}
Expand Down Expand Up @@ -250,7 +250,7 @@ class VerticesProcessor(spark: SparkSession,
isVertexValid(row, tagConfig, false, vidType == VidType.STRING)

val index: Int = row.schema.fieldIndex(tagConfig.vertexField)
var vertexId: String = row.get(index).toString
var vertexId: String = row.get(index).toString.trim
if (vertexId.equals(DEFAULT_EMPTY_VALUE)) {
vertexId = ""
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ class EdgeProcessor(spark: SparkSession,
if (index < 0 || row.isNullAt(index)) {
printChoice(streamFlag, s"rank must exist and cannot be null, your row data is $row")
}
val ranking = row.get(index).toString
val ranking = row.get(index).toString.trim
if (!NebulaUtils.isNumic(ranking)) {
printChoice(streamFlag,
s"Not support non-Numeric type for ranking field.your row data is $row")
Expand Down Expand Up @@ -239,7 +239,7 @@ class EdgeProcessor(spark: SparkSession,
if (index < 0 || row.isNullAt(index)) {
printChoice(streamFlag, s"$fieldType must exist and cannot be null, your row data is $row")
None
} else Some(row.get(index).toString)
} else Some(row.get(index).toString.trim)
}

val idFlag = fieldValue.isDefined
Expand Down Expand Up @@ -285,7 +285,7 @@ class EdgeProcessor(spark: SparkSession,

if (edgeConfig.rankingField.isDefined) {
val index = row.schema.fieldIndex(edgeConfig.rankingField.get)
val ranking = row.get(index).toString
val ranking = row.get(index).toString.trim
Edge(sourceField, targetField, Some(ranking.toLong), values)
} else {
Edge(sourceField, targetField, None, values)
Expand All @@ -306,7 +306,7 @@ class EdgeProcessor(spark: SparkSession,
indexCells(lat, lng).mkString(",")
} else {
val index = row.schema.fieldIndex(field)
val value = row.get(index).toString
val value = row.get(index).toString.trim
if (value.equals(DEFAULT_EMPTY_VALUE)) "" else value
}
// process string type vid
Expand All @@ -328,13 +328,13 @@ class EdgeProcessor(spark: SparkSession,
isEdgeValid(row, edgeConfig, false, vidType == VidType.STRING)

val srcIndex: Int = row.schema.fieldIndex(edgeConfig.sourceField)
var srcId: String = row.get(srcIndex).toString
var srcId: String = row.get(srcIndex).toString.trim
if (srcId.equals(DEFAULT_EMPTY_VALUE)) {
srcId = ""
}

val dstIndex: Int = row.schema.fieldIndex(edgeConfig.targetField)
var dstId: String = row.get(dstIndex).toString
var dstId: String = row.get(dstIndex).toString.trim
if (dstId.equals(DEFAULT_EMPTY_VALUE)) {
dstId = ""
}
Expand Down Expand Up @@ -366,7 +366,7 @@ class EdgeProcessor(spark: SparkSession,

val ranking: Long = if (edgeConfig.rankingField.isDefined) {
val rankIndex = row.schema.fieldIndex(edgeConfig.rankingField.get)
row.get(rankIndex).toString.toLong
row.get(rankIndex).toString.trim.toLong
} else {
0
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ class VerticesProcessor(spark: SparkSession,
fieldKeys: List[String],
fieldTypeMap: Map[String, Int]): Vertex = {
val index = row.schema.fieldIndex(tagConfig.vertexField)
var vertexId = row.get(index).toString
var vertexId = row.get(index).toString.trim
if (vertexId.equals(DEFAULT_EMPTY_VALUE)) {
vertexId = ""
}
Expand Down Expand Up @@ -250,7 +250,7 @@ class VerticesProcessor(spark: SparkSession,
isVertexValid(row, tagConfig, false, vidType == VidType.STRING)

val index: Int = row.schema.fieldIndex(tagConfig.vertexField)
var vertexId: String = row.get(index).toString
var vertexId: String = row.get(index).toString.trim
if (vertexId.equals(DEFAULT_EMPTY_VALUE)) {
vertexId = ""
}
Expand Down