Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Shx Index to split large shapefiles #146

Merged
merged 1 commit into from
Aug 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion src/main/scala/magellan/ShapefileRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import java.util.Objects

import magellan.io._
import magellan.mapreduce._
import org.apache.hadoop.io.{MapWritable, Text}
import org.apache.hadoop.io.{ArrayWritable, LongWritable, MapWritable, Text}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext

import scala.collection.JavaConversions._
import scala.util.Try

/**
* A Shapefile relation is the entry point for working with Shapefile formats.
Expand All @@ -37,6 +38,27 @@ case class ShapeFileRelation(

protected override def _buildScan(): RDD[Array[Any]] = {

// read the shx files, if they exist
val fileNameToFileSplits = Try(sc.newAPIHadoopFile(
path + "/*.shx",
classOf[ShxInputFormat],
classOf[Text],
classOf[ArrayWritable]
).map { case (txt: Text, splits: ArrayWritable) =>
val fileName = txt.toString
val s = splits.get()
val size = s.length
var i = 0
val v = Array.fill(size)(0L)
while (i < size) {
v.update(i, s(i).asInstanceOf[LongWritable].get())
i += 1
}
(fileName, v)
}.collectAsMap())

fileNameToFileSplits.map(SplitInfos.SPLIT_INFO_MAP.set(_))

val shapefileRdd = sqlContext.sparkContext.newAPIHadoopFile(
path + "/*.shp",
classOf[ShapeInputFormat],
Expand Down
4 changes: 1 addition & 3 deletions src/main/scala/magellan/io/ShapeWritable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import magellan.Shape
import org.apache.commons.io.EndianUtils
import org.apache.hadoop.io.Writable

private[magellan] class ShapeWritable(shapeType: Int) extends Writable {
private[magellan] class ShapeWritable extends Writable {

var shape: Shape = _

Expand All @@ -32,8 +32,6 @@ private[magellan] class ShapeWritable(shapeType: Int) extends Writable {

override def readFields(dataInput: DataInput): Unit = {
val shapeType = EndianUtils.swapInteger(dataInput.readInt())
// all records share the same type or nullshape.
require(this.shapeType == shapeType || shapeType == 0)
val h = shapeType match {
case 0 => new NullShapeReader()
case 1 => new PointReader()
Expand Down
66 changes: 61 additions & 5 deletions src/main/scala/magellan/mapreduce/ShapeInputFormat.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,76 @@

package magellan.mapreduce

import org.apache.hadoop.fs.Path
import com.google.common.base.Stopwatch
import magellan.io.{ShapeKey, ShapeWritable}
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.fs.{LocatedFileStatus, Path}
import org.apache.hadoop.mapreduce.lib.input._
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, TaskAttemptContext}

import magellan.io.{ShapeWritable, ShapeKey}
import scala.collection.JavaConversions._
import scala.collection.mutable.ListBuffer

private[magellan] class ShapeInputFormat extends FileInputFormat[ShapeKey, ShapeWritable] {
private[magellan] class ShapeInputFormat
extends FileInputFormat[ShapeKey, ShapeWritable] {

private val log = LogFactory.getLog(classOf[ShapeInputFormat])

override def createRecordReader(inputSplit: InputSplit,
taskAttemptContext: TaskAttemptContext) = {
new ShapefileReader
}

// TODO: Use DBIndex to figure out how to efficiently split files.
override def isSplitable(context: JobContext, filename: Path): Boolean = false
override def isSplitable(context: JobContext, filename: Path): Boolean = true

override def getSplits(job: JobContext): java.util.List[InputSplit] = {
val splitInfos = SplitInfos.SPLIT_INFO_MAP.get()
computeSplits(job, splitInfos)
}

private def computeSplits(
job: JobContext,
splitInfos: scala.collection.Map[String, Array[Long]]) = {

val sw = new Stopwatch().start
val splits = ListBuffer[InputSplit]()
val files = listStatus(job)
for (file <- files) {
val path = file.getPath
val length = file.getLen
val blkLocations = if (file.isInstanceOf[LocatedFileStatus]) {
file.asInstanceOf[LocatedFileStatus].getBlockLocations
} else {
val fs = path.getFileSystem(job.getConfiguration)
fs.getFileBlockLocations(file, 0, length)
}
val key = path.getName.split("\\.shp$")(0)
if (splitInfos == null || !splitInfos.containsKey(key)) {
val blkIndex = getBlockIndex(blkLocations, 0)
splits.+= (makeSplit(path, 0, length, blkLocations(blkIndex).getHosts,
blkLocations(blkIndex).getCachedHosts))
} else {
val s = splitInfos(key).toSeq
val start = s
val end = s.drop(1) ++ Seq(length)
start.zip(end).foreach { case (startOffset: Long, endOffset: Long) =>
val blkIndex = getBlockIndex(blkLocations, startOffset)
splits.+=(makeSplit(path, startOffset, endOffset - startOffset, blkLocations(blkIndex).getHosts,
blkLocations(blkIndex).getCachedHosts))
}
}
}
sw.stop
if (log.isDebugEnabled) {
log.debug("Total # of splits generated by getSplits: " + splits.size + ", TimeTaken: " + sw.elapsedMillis)
}
splits
}
}

object SplitInfos {

// TODO: Can we get rid of this hack to pass split calculation to the Shapefile Reader?
val SPLIT_INFO_MAP = new ThreadLocal[scala.collection.Map[String, Array[Long]]]

}
43 changes: 20 additions & 23 deletions src/main/scala/magellan/mapreduce/ShapefileReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@ private[magellan] class ShapefileReader extends RecordReader[ShapeKey, ShapeWrit

private var dis: DataInputStream = _

private var length: BigInt = _

private var remaining: BigInt = _

override def getProgress: Float = remaining.toFloat / length.toFloat
override def getProgress: Float = 0

override def nextKeyValue(): Boolean = {
if (remaining <= 0) {
Expand All @@ -47,7 +45,7 @@ private[magellan] class ShapefileReader extends RecordReader[ShapeKey, ShapeWrit
val recordNumber = dis.readInt()
// record numbers begin at 1
require(recordNumber > 0)
val contentLength = 16 * (dis.readInt() + 4)
val contentLength = 2 * (dis.readInt() + 4)
value.readFields(dis)
remaining -= contentLength
key.setRecordIndex(key.getRecordIndex() + 1)
Expand All @@ -60,27 +58,26 @@ private[magellan] class ShapefileReader extends RecordReader[ShapeKey, ShapeWrit
override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext) {
val split = inputSplit.asInstanceOf[FileSplit]
val job = MapReduceUtils.getConfigurationFromContext(taskAttemptContext)
val start = split.getStart()
val end = start + split.getLength()
val file = split.getPath()
val fs = file.getFileSystem(job)
val is = fs.open(split.getPath())

val path = split.getPath()
val fs = path.getFileSystem(job)
val is = fs.open(path)

val (start, end) = {
val v = split.getStart
if (v == 0) {
is.seek(24)
(100L, 2 * is.readInt().toLong)
} else {
(v, v + split.getLength)
}
}

is.seek(start)
dis = new DataInputStream(is)
require(is.readInt() == 9994)
// skip the next 20 bytes which should all be zero
0 until 5 foreach {_ => require(is.readInt() == 0)}
// file length in bits
val i: BigInt = is.readInt()
length = 16 * i - 50 * 16
remaining = length
val version = EndianUtils.swapInteger(is.readInt())
require(version == 1000)
// shape type: all the shapes in a given split have the same type
val shapeType = EndianUtils.swapInteger(is.readInt())
key.setFileNamePrefix(split.getPath.getName.split("\\.")(0))
value = new ShapeWritable(shapeType)
// skip the next 64 bytes
0 until 8 foreach {_ => is.readDouble()}
value = new ShapeWritable()
remaining = (end - start)
}

override def getCurrentKey: ShapeKey = key
Expand Down
130 changes: 130 additions & 0 deletions src/main/scala/magellan/mapreduce/ShxInputFormat.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/**
* Copyright 2015 Ram Sriharsha
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package magellan.mapreduce

import java.io.DataInputStream

import org.apache.commons.io.EndianUtils
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}

import scala.collection.mutable.ListBuffer

class ShxInputFormat extends FileInputFormat[Text, ArrayWritable] {

override def createRecordReader(
split: InputSplit,
context: TaskAttemptContext): RecordReader[Text, ArrayWritable] = {
new ShxReader()
}

override def isSplitable(context: JobContext, filename: Path): Boolean = false
}

class ShxReader extends RecordReader[Text, ArrayWritable] {

private var dis: DataInputStream = _

override def getProgress: Float = ???

private var done: Boolean = false

private var splits:ArrayWritable = _

private var key: Text = new Text()

private val MAX_SPLIT_SIZE = "mapreduce.input.fileinputformat.split.maxsize"

private val MIN_SPLIT_SIZE = "mapreduce.input.fileinputformat.split.minsize"


override def nextKeyValue(): Boolean = if (done) false else {
done = true
true
}

override def getCurrentValue: ArrayWritable = {
splits
}

override def initialize(inputSplit: InputSplit, context: TaskAttemptContext): Unit = {
val split = inputSplit.asInstanceOf[FileSplit]
val job = MapReduceUtils.getConfigurationFromContext(context)
val start = split.getStart()
val end = start + split.getLength()
val path = split.getPath()
val fs = path.getFileSystem(job)
key.set(split.getPath.getName.split("\\.")(0))
val is = fs.open(path)
dis = new DataInputStream(is)
require(is.readInt() == 9994)
// skip the next 20 bytes which should all be zero
0 until 5 foreach {_ => require(is.readInt() == 0)}
// file length in bits
val len = is.readInt()
val numRecords = (2 * len - 100) / 8

val version = EndianUtils.swapInteger(is.readInt())
require(version == 1000)
// shape type: all the shapes in a given split have the same type
is.readInt()

// skip the next 64 bytes
0 until 8 foreach {_ => is.readDouble()}

// iterate over the offset and content length of each record
var j = 0
val minSplitSize = job.getLong(MIN_SPLIT_SIZE, 1L)
val maxSplitSize = job.getLong(MAX_SPLIT_SIZE, Long.MaxValue)
val shpFileName = path.getName.replace("\\.shx$", "\\.shp")
val blockSize = fs.getFileStatus(new Path(path.getParent, shpFileName)).getBlockSize
val splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize))

// num bytes
val v = new ListBuffer[Writable]()

var startOffset: Long = Long.MinValue

while (j < numRecords) {
val offset = dis.readInt()
// skip the next 4 bytes (the content length)
dis.readInt()

if (startOffset == Long.MinValue) {
startOffset = offset
}
else if (offset - startOffset > splitSize) {
v.+= (new LongWritable(startOffset * 2))
startOffset = offset
}
j += 1
}

// if empty add starting offset
if (v.isEmpty) {
v.+= (new LongWritable(startOffset * 2))
}

splits = new ArrayWritable(classOf[LongWritable], v.toArray)
}

override def getCurrentKey: Text = key

override def close() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
UTF-8
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
GEOGCS["GCS_North_American_1983",DATUM["D_North_American_1983",SPHEROID["GRS_1980",6378137,298.257222101]],PRIMEM["Greenwich",0],UNIT["Degree",0.017453292519943295]]
Binary file not shown.
Loading