A library for parsing and querying an Esri File Geodatabase with Apache Spark.
This work in progress is a pure Scala read-only implementation based on this reverse engineered specification. Understanding the internal file structure enables partitioning to perform massive parallel reading. The reading API is based on the Hadoop File System API enabling the placement of the GDB in HDFS or S3 (not tested) for multi node access. There is still a lot to be done, but is a good start. Eventually, I will merge this project with my Ibn Battuta Project.
Update (Jan 7 2016) - for Polylines and Polygons, I'm mimicking the core Spark Vector class. In a way, they have almost the same semantics.
- Implement asGeometry
Use Esri Geometry Library rather than JTS (I love JTS, so many utility functions on they geometry model)- Implement
Point,PolylineandPolygonas Spatial Type using UDT spec. - Handle more shapes - multiXXX and with Z and M
- Read default values in field definitions
- Register custom Kryo serializer for shapes (optimization - but worth it :-)
- Perform a scan rather than a seek if the index row count is the same as the table count (should help performance)
- Test Multi part geometries
- Test XML field type
- Test Blob field type
- Handle Raster (super low priority)
- Make internal structure of Shapes more like GeoJSON but will be heavy for SerDe !
This project build process is based on Apache Maven
export SPARK_LOCAL_IP=localhost
mvn install
The test data in src/test/resources/Test.gdb
was generated using the ArcPy Tool src/test/python/TestToolbox.pyt
Thought the coordinates of the shapes are random, the coordinates values are placed as attributes for testing.
In the case of the Points
feature class, the x/y coordinate values should match the values in the attributes X
and Y
enabling cross checking during testing.
$SPARK_HOME/bin/spark-shell --packages com.esri:spark-gdb:0.7
import com.esri.gdb._
import com.esri.udt._
sc.gdbFile("src/test/resources/Test.gdb", "Points", numPartitions = 2).map(row => {
row.getAs[PointType](row.fieldIndex("Shape"))
}).foreach(println)
val df = sqlContext.read.
format("com.esri.gdb").
option("path", "src/test/resources/Test.gdb").
option("name", "Points").
load()
df.printSchema()
df.registerTempTable("points")
sqlContext.sql(s"select * from points").show()
For the Spatial UDT (User Defined Types), I am following the VectorUDT
implementation.
In Scala:
val df = sqlContext.read.format("com.esri.gdb")
.option("path", path)
.option("name", name)
.option("numPartitions", "1")
.load()
df.printSchema()
df.registerTempTable(name)
sqlContext.udf.register("getX", (point: PointType) => point.x)
sqlContext.udf.register("getY", (point: PointType) => point.y)
sqlContext.udf.register("plus2", (point: PointType) => PointType(point.x + 2, point.y + 2))
sqlContext.sql(s"select getX(plus2(Shape)),getX(Shape) as y from $name")
.show(20)
In Python:
df = sqlContext.read \
.format("com.esri.gdb") \
.options(path="../../test/resources/Test.gdb", name=gdb_name, numPartitions="1") \
.load()
df.printSchema()
df.registerTempTable(gdb_name)
sqlContext.registerFunction("getX", lambda p: p.x, DoubleType())
sqlContext.registerFunction("getY", lambda p: p.y, DoubleType())
sqlContext.registerFunction("plus2", lambda p: PointType(p.x + 2, p.y + 2), PointUDT())
rows = sqlContext.sql("select plus2(Shape),X,Y from {}".format(gdb_name))
for row in rows.collect():
print row
We will use Docker to bootstrap a Cloudera quickstart container instance.
I highly recommend the installation of the Docker Toolbox for a Docker quick start.
Compile the project with the quickstart
profile:
mvn -Pquickstart clean package
Create a local docker enabled machine using 6 cores, 8GB of RAM and with 10 GB of virtual disk:
docker-machine create\
--driver virtualbox\
--virtualbox-cpu-count 6\
--virtualbox-memory 8192\
--virtualbox-disk-size 10240\
--virtualbox-no-vtx-check\
quickstart
On Windows, I had to upgrade my machine instance:
docker-machine upgrade quickstart
Set up the docker environment:
eval $(docker-machine env quickstart)
Start a single node Hadoop instance with ZooKeeper, HDFS, YARN and Spark.
(This is a 4GB download, so go grab some coffee and walk your dog, it is gonna take a while ! But, you only have to do that once. Ah... the beauty of docker images :-)
We expose the ports for Cloudera Manager (7180), Hue (8888) and NameNode (50070).
And to facilitate the moving of jars and test data from the host into the container, we map the /Users
host folder onto the container /Users
folder.
docker run\
--privileged=true\
--hostname=quickstart.cloudera\
-v /Users:/Users\
-p 7180:7180\
-p 8888:8888\
-p 50070:50070\
-p 50075:50075\
-p 21050:21050\
-t -i cloudera/quickstart:latest\
/usr/bin/docker-quickstart
Copy Test.gdb
to HDFS:
hadoop fs -mkdir /data
hadoop fs -put /Users/<YOUR_PATH>/spark-gdb/src/test/resources/Test.gdb /data
Start A Spark shell:
spark-shell --jars /Users/<YOUR_PATH>/spark-gdb/target/spark-gdb-0.7.jar
Submit a Spark Context job:
import com.esri.gdb._
import com.esri.udt.PointType
sc.gdbFile("hdfs:///data/Test.gdb", "Points", numPartitions = 2).map(row => {
row.getAs[PointType](row.fieldIndex("Shape"))
}).foreach(println)
Submit a SQL Context job:
val df = sqlContext.read.
format("com.esri.gdb").
option("path", "hdfs:///data/Test.gdb").
option("name", "Lines").
option("numPartitions", "2").
load()
df.registerTempTable("lines")
sqlContext.sql("select * from lines").show()
set terminal type in windows to enable cursor movement:
set term=ansi
Start a CM instance:
/home/cloudera/cloudera-manager --express