Skip to content

Commit

Permalink
fix the microsec process (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicole00 authored Dec 13, 2021
1 parent c6162ca commit dff8fd4
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -1,19 +1,30 @@

/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.exchange.processor

import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value, Geography, Coordinate, Point, LineString, Polygon}
import com.vesoft.nebula.{
Date,
DateTime,
NullType,
Time,
Value,
Geography,
Coordinate,
Point,
LineString,
Polygon
}
import com.vesoft.nebula.exchange.utils.NebulaUtils.DEFAULT_EMPTY_VALUE
import com.vesoft.nebula.exchange.utils.{HDFSUtils, NebulaUtils}
import com.vesoft.nebula.meta.PropertyType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer

/**
* processor is a converter.
* It is responsible for converting the dataframe row data into Nebula Graph's vertex or edge,
Expand All @@ -35,7 +46,7 @@ trait Processor extends Serializable {
* eg: convert attribute value 2020-01-01 to date("2020-01-01")
*
* Time type: add time() function for attribute value.
* eg: convert attribute value 12:12:12:1111 to time("12:12:12:1111")
* eg: convert attribute value 12:12:12.1111 to time("12:12:12.1111")
*
* DataTime type: add datetime() function for attribute value.
* eg: convert attribute value 2020-01-01T22:30:40 to datetime("2020-01-01T22:30:40")
Expand Down Expand Up @@ -98,10 +109,10 @@ trait Processor extends Serializable {
throw new UnsupportedOperationException(
s"wrong format for time value: ${row.get(index)}, correct format is 12:00:00:0000")
}
new Time(values(0).toByte,
values(1).toByte,
values(2).toByte,
if (values.length > 3) values(3).toInt else 0)
val secs: Array[String] = values(2).split("\\.")
val sec: Byte = secs(0).toByte
val microSec: Int = if (secs.length == 2) secs(1).toInt else 0
new Time(values(0).toByte, values(1).toByte, sec, microSec)
}
case PropertyType.DATE => {
val values = row.get(index).toString.split("-")
Expand All @@ -121,13 +132,13 @@ trait Processor extends Serializable {
} else {
throw new UnsupportedOperationException(
s"wrong format for datetime value: $rowValue, " +
s"correct format is 2020-01-01T12:00:00:0000 or 2020-01-01 12:00:00:0000")
s"correct format is 2020-01-01T12:00:00.0000 or 2020-01-01 12:00:00.0000")
}

if (dateTimeValue.size < 2) {
throw new UnsupportedOperationException(
s"wrong format for datetime value: $rowValue, " +
s"correct format is 2020-01-01T12:00:00:0000 or 2020-01-01 12:00:00:0000")
s"correct format is 2020-01-01T12:00:00.0000 or 2020-01-01 12:00:00.0000")
}

val dateValues = dateTimeValue(0).split("-")
Expand All @@ -136,16 +147,18 @@ trait Processor extends Serializable {
if (dateValues.size < 3 || timeValues.size < 3) {
throw new UnsupportedOperationException(
s"wrong format for datetime value: $rowValue, " +
s"correct format is 2020-01-01T12:00:00:0000 or 2020-01-01 12:00:00")
s"correct format is 2020-01-01T12:00:00.0000 or 2020-01-01 12:00:00")
}

val microsec: Int = if (timeValues.size == 4) timeValues(3).toInt else 0
val secs: Array[String] = timeValues(2).split("\\.")
val sec: Byte = secs(0).toByte
val microsec: Int = if (secs.length == 2) secs(1).toInt else 0
new DateTime(dateValues(0).toShort,
dateValues(1).toByte,
dateValues(2).toByte,
timeValues(0).toByte,
timeValues(1).toByte,
timeValues(2).toByte,
sec,
microsec)
}
case PropertyType.TIMESTAMP => {
Expand All @@ -157,7 +170,7 @@ trait Processor extends Serializable {
row.get(index).toString.toLong
}
case PropertyType.GEOGRAPHY => {
val wkt = row.get(index).toString
val wkt = row.get(index).toString
val jtsGeom = new org.locationtech.jts.io.WKTReader().read(wkt)
convertJTSGeometryToGeography(jtsGeom)
}
Expand Down Expand Up @@ -186,18 +199,18 @@ trait Processor extends Serializable {
}
case "LineString" => {
val jtsLineString = jtsGeom.asInstanceOf[org.locationtech.jts.geom.LineString]
val jtsCoordList = jtsLineString.getCoordinates
val coordList = new ListBuffer[Coordinate]()
val jtsCoordList = jtsLineString.getCoordinates
val coordList = new ListBuffer[Coordinate]()
for (jtsCoord <- jtsCoordList) {
coordList += new Coordinate(jtsCoord.x, jtsCoord.y)
}
Geography.lsVal(new LineString(coordList.asJava))
}
case "Polygon" => {
val jtsPolygon = jtsGeom.asInstanceOf[org.locationtech.jts.geom.Polygon]
val jtsPolygon = jtsGeom.asInstanceOf[org.locationtech.jts.geom.Polygon]
val coordListList = new java.util.ArrayList[java.util.List[Coordinate]]()
val jtsShell = jtsPolygon.getExteriorRing
val coordList = new ListBuffer[Coordinate]()
val jtsShell = jtsPolygon.getExteriorRing
val coordList = new ListBuffer[Coordinate]()
for (jtsCoord <- jtsShell.getCoordinates) {
coordList += new Coordinate(jtsCoord.x, jtsCoord.y)
}
Expand All @@ -206,7 +219,7 @@ trait Processor extends Serializable {
val jtsHolesNum = jtsPolygon.getNumInteriorRing
for (i <- 0 until jtsHolesNum) {
val coordList = new ListBuffer[Coordinate]()
val jtsHole = jtsPolygon.getInteriorRingN(i)
val jtsHole = jtsPolygon.getInteriorRingN(i)
for (jtsCoord <- jtsHole.getCoordinates) {
coordList += new Coordinate(jtsCoord.x, jtsCoord.y)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,18 @@
package scala.com.vesoft.nebula.exchange.processor

import com.vesoft.nebula.exchange.processor.Processor
import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value, Geography, Coordinate, Point, LineString, Polygon}
import com.vesoft.nebula.{
Date,
DateTime,
NullType,
Time,
Value,
Geography,
Coordinate,
Point,
LineString,
Polygon
}
import com.vesoft.nebula.meta.PropertyType
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.{
Expand All @@ -30,8 +41,8 @@ class ProcessorSuite extends Processor {
1000,
100000,
"2021-01-01",
"2021-01-01T12:00:00",
"12:00:00",
"2021-01-01T12:00:00.100",
"12:00:00.100",
"2021-01-01T12:00:00",
true,
12.01,
Expand Down Expand Up @@ -92,8 +103,9 @@ class ProcessorSuite extends Processor {
assert(extraValueForClient(row, "col6", map).toString.toLong == 100000)
assert(extraValueForClient(row, "col7", map).toString.equals("date(\"2021-01-01\")"))
assert(
extraValueForClient(row, "col8", map).toString.equals("datetime(\"2021-01-01T12:00:00\")"))
assert(extraValueForClient(row, "col9", map).toString.equals("time(\"12:00:00\")"))
extraValueForClient(row, "col8", map).toString
.equals("datetime(\"2021-01-01T12:00:00.100\")"))
assert(extraValueForClient(row, "col9", map).toString.equals("time(\"12:00:00.100\")"))
assert(
extraValueForClient(row, "col10", map).toString.equals("timestamp(\"2021-01-01T12:00:00\")"))
assert(extraValueForClient(row, "col11", map).toString.toBoolean)
Expand All @@ -120,10 +132,10 @@ class ProcessorSuite extends Processor {
assert(extraValueForSST(row, "col6", map).toString.toLong == 100000)
val date = new Date(2021, 1, 1)
assert(extraValueForSST(row, "col7", map).equals(date))
val datetime = new DateTime(2021, 1, 1, 12, 0, 0, 0)
val datetime = new DateTime(2021, 1, 1, 12, 0, 0, 100)
assert(extraValueForSST(row, "col8", map).equals(datetime))

val time = new Time(12, 0, 0, 0)
val time = new Time(12, 0, 0, 100)
assert(extraValueForSST(row, "col9", map).equals(time))

try {
Expand All @@ -141,7 +153,7 @@ class ProcessorSuite extends Processor {
assert(extraValueForSST(row, "col14", map).equals(nullValue))

// POINT(3 8)
val geogPoint = Geography.ptVal(new Point(new Coordinate(3, 8)))
val geogPoint = Geography.ptVal(new Point(new Coordinate(3, 8)))
val geogPointExpect = extraValueForSST(row, "col15", map)

assert(geogPointExpect.equals(geogPoint))
Expand Down

0 comments on commit dff8fd4

Please sign in to comment.