Skip to content

Commit

Permalink
support sst ingest for geo (#22)
Browse files Browse the repository at this point in the history
* support sst ingest for geo

* fix

* add sst test for geo

* debug
  • Loading branch information
jievince authored Nov 11, 2021
1 parent f1ca58a commit c1f043a
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 4 deletions.
7 changes: 7 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ target/
.idea/
.eclipse/
*.iml
.project
.bloop
.metals
.settings
.vscode
.classpath
.factorypath

spark-importer.ipr
spark-importer.iws
Expand Down
5 changes: 5 additions & 0 deletions nebula-exchange/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,11 @@
<artifactId>clickhouse-jdbc</artifactId>
<version>0.2.5</version>
</dependency>
<dependency>
<groupId>org.locationtech.jts</groupId>
<artifactId>jts-core</artifactId>
<version>1.16.1</version>
</dependency>
</dependencies>
<repositories>
<repository>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@

/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.exchange.processor

import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value}
import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value, Geography, Coordinate, Point, LineString, Polygon}
import com.vesoft.nebula.exchange.utils.NebulaUtils.DEFAULT_EMPTY_VALUE
import com.vesoft.nebula.exchange.utils.{HDFSUtils, NebulaUtils}
import com.vesoft.nebula.meta.PropertyType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{IntegerType, LongType, StringType}

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
/**
* processor is a converter.
* It is responsible for converting the dataframe row data into Nebula Graph's vertex or edge,
Expand Down Expand Up @@ -155,7 +157,9 @@ trait Processor extends Serializable {
row.get(index).toString.toLong
}
case PropertyType.GEOGRAPHY => {
throw new IllegalArgumentException("sst import does not support GEOGRAPHY property yet.")
val wkt = row.get(index).toString
val jtsGeom = new org.locationtech.jts.io.WKTReader().read(wkt)
convertJTSGeometryToGeography(jtsGeom)
}
}
}
Expand All @@ -172,4 +176,44 @@ trait Processor extends Serializable {
case StringType => row.getString(index).toLong
}
}

def convertJTSGeometryToGeography(jtsGeom: org.locationtech.jts.geom.Geometry): Geography = {
jtsGeom.getGeometryType match {
case "Point" => {
val jtsPoint = jtsGeom.asInstanceOf[org.locationtech.jts.geom.Point]
val jtsCoord = jtsPoint.getCoordinate
Geography.ptVal(new Point(new Coordinate(jtsCoord.x, jtsCoord.y)))
}
case "LineString" => {
val jtsLineString = jtsGeom.asInstanceOf[org.locationtech.jts.geom.LineString]
val jtsCoordList = jtsLineString.getCoordinates
val coordList = new ListBuffer[Coordinate]()
for (jtsCoord <- jtsCoordList) {
coordList += new Coordinate(jtsCoord.x, jtsCoord.y)
}
Geography.lsVal(new LineString(coordList.asJava))
}
case "Polygon" => {
val jtsPolygon = jtsGeom.asInstanceOf[org.locationtech.jts.geom.Polygon]
val coordListList = new java.util.ArrayList[java.util.List[Coordinate]]()
val jtsShell = jtsPolygon.getExteriorRing
val coordList = new ListBuffer[Coordinate]()
for (jtsCoord <- jtsShell.getCoordinates) {
coordList += new Coordinate(jtsCoord.x, jtsCoord.y)
}
coordListList.add(coordList.asJava)

val jtsHolesNum = jtsPolygon.getNumInteriorRing
for (i <- 0 until jtsHolesNum) {
val coordList = new ListBuffer[Coordinate]()
val jtsHole = jtsPolygon.getInteriorRingN(i)
for (jtsCoord <- jtsHole.getCoordinates) {
coordList += new Coordinate(jtsCoord.x, jtsCoord.y)
}
coordListList.add(coordList.asJava)
}
Geography.pgVal(new Polygon(coordListList))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package scala.com.vesoft.nebula.exchange.processor

import com.vesoft.nebula.exchange.processor.Processor
import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value}
import com.vesoft.nebula.{Date, DateTime, NullType, Time, Value, Geography, Coordinate, Point, LineString, Polygon}
import com.vesoft.nebula.meta.PropertyType
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.{
Expand Down Expand Up @@ -139,6 +139,28 @@ class ProcessorSuite extends Processor {
val nullValue = new Value()
nullValue.setNVal(NullType.__NULL__)
assert(extraValueForSST(row, "col14", map).equals(nullValue))

// POINT(3 8)
val geogPoint = Geography.ptVal(new Point(new Coordinate(3, 8)))
val geogPointExpect = extraValueForSST(row, "col15", map)

assert(geogPointExpect.equals(geogPoint))
// LINESTRING(3 8, 4.7 73.23)
val line = new java.util.ArrayList[Coordinate]()
line.add(new Coordinate(3, 8))
line.add(new Coordinate(4.7, 73.23))
val geogLineString = Geography.lsVal(new LineString(line))
assert(extraValueForSST(row, "col16", map).equals(geogLineString))
// POLYGON((0 1, 1 2, 2 3, 0 1))
val shell: java.util.List[Coordinate] = new java.util.ArrayList[Coordinate]()
shell.add(new Coordinate(0, 1))
shell.add(new Coordinate(1, 2))
shell.add(new Coordinate(2, 3))
shell.add(new Coordinate(0, 1))
val rings = new java.util.ArrayList[java.util.List[Coordinate]]()
rings.add(shell)
val geogPolygon = Geography.pgVal(new Polygon(rings))
assert(extraValueForSST(row, "col17", map).equals(geogPolygon))
}

/**
Expand Down

0 comments on commit c1f043a

Please sign in to comment.