Skip to content

Commit

Permalink
add geotiff reader into codebase
Browse files Browse the repository at this point in the history
Signed-off-by: Grigory Pomadchin <gr.pomadchin@gmail.com>
  • Loading branch information
pomadchin committed Mar 28, 2017
1 parent 2cd357a commit da40aa3
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 47 deletions.
3 changes: 1 addition & 2 deletions spark/src/main/scala/geotrellis/spark/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import geotrellis.spark.tiling._
import geotrellis.spark.ingest._
import geotrellis.spark.crop._
import geotrellis.spark.filter._

import org.apache.spark.{Partitioner, SparkContext}
import org.apache.spark.rdd._
import spire.syntax.cfor._
Expand Down Expand Up @@ -60,7 +59,7 @@ package object spark
with summary.Implicits
with summary.polygonal.Implicits
with tiling.Implicits
with spark.raster.Implicits {
with spark.raster.io.Implicits {
type TileLayerRDD[K] = RDD[(K, Tile)] with Metadata[TileLayerMetadata[K]]
object TileLayerRDD {
def apply[K](rdd: RDD[(K, Tile)], metadata: TileLayerMetadata[K]): TileLayerRDD[K] =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package geotrellis.spark.raster.io

import geotrellis.raster.io.geotiff.{MultibandGeoTiff, SinglebandGeoTiff}
import geotrellis.raster.io.geotiff.reader._
import geotrellis.vector.Extent

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext

object GeoTiffHadoopReader {
def readSingleband(path: Path)(implicit sc: SparkContext): SinglebandGeoTiff = readSingleband(path, decompress = true, streaming = false, None, sc.hadoopConfiguration)
def readSingleband(path: Path, decompress: Boolean, streaming: Boolean, extent: Option[Extent], conf: Configuration): SinglebandGeoTiff = {
HadoopRWMethods.read(path, conf) { is =>
val geoTiff = GeoTiffReader.readSingleband(IOUtils.toByteArray(is), decompress, streaming)
extent match {
case Some(e) => geoTiff.crop(e)
case _ => geoTiff
}
}
}

def readMultiband(path: Path)(implicit sc: SparkContext): MultibandGeoTiff = readMultiband(path, decompress = true, streaming = false, None, sc.hadoopConfiguration)
def readMultiband(path: Path, decompress: Boolean, streaming: Boolean, extent: Option[Extent], conf: Configuration): MultibandGeoTiff = {
HadoopRWMethods.read(path, conf) { is =>
val geoTiff = GeoTiffReader.readMultiband(IOUtils.toByteArray(is), decompress, streaming)
extent match {
case Some(e) => geoTiff.crop(e)
case _ => geoTiff
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,24 @@
* limitations under the License.
*/

package geotrellis.spark.raster
package geotrellis.spark.raster.io

import geotrellis.util.MethodExtensions
import java.io.{DataInputStream, DataOutputStream}

import geotrellis.util.MethodExtensions
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.spark.SparkContext

import java.io.DataOutputStream

trait HadoopWriteMethods[T] extends MethodExtensions[T] {
def write(path: Path)(implicit sc: SparkContext): Unit = write(path, gzip = false)
def write(path: Path, gzip: Boolean)(implicit sc: SparkContext): Unit = write(path, gzip, sc.hadoopConfiguration)
def write(path: Path, conf: Configuration): Unit = write(path, gzip = false, conf)
def write(path: Path, gzip: Boolean, conf: Configuration): Unit
}

object HadoopWriteMethods {
object HadoopRWMethods {
def write(path: Path, gzip: Boolean, conf: Configuration)(dosWrite: DataOutputStream => Unit): Unit = {
val fs = FileSystem.get(conf)

Expand Down Expand Up @@ -63,4 +62,30 @@ object HadoopWriteMethods {
os.close
}
}

def read[T](path: Path, conf: Configuration)(disRead: DataInputStream => T): T = {
val fs = FileSystem.get(conf)

val is = {
val factory = new CompressionCodecFactory(conf)
val codec = factory.getCodec(path)

if (codec == null) {
println(s"No codec found for $path, reading without compression.")
fs.open(path)
} else {
codec.createInputStream(fs.open(path))
}
}
try {
val dis = new DataInputStream(is)
try {
disRead(dis)
} finally {
dis.close
}
} finally {
is.close
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package geotrellis.spark.raster
package geotrellis.spark.raster.io

import geotrellis.raster.CellGrid
import geotrellis.raster.io.geotiff.GeoTiff
Expand All @@ -29,16 +29,16 @@ object Implicits extends Implicits
trait Implicits {
implicit class withJpgHadoopWriteMethods(val self: Jpg) extends HadoopWriteMethods[Jpg] {
def write(path: Path, gzip: Boolean, conf: Configuration): Unit =
HadoopWriteMethods.write(path, gzip, conf) { _.write(self.bytes) }
HadoopRWMethods.write(path, gzip, conf) { _.write(self.bytes) }
}

implicit class withPngHadoopWriteMethods(val self: Png) extends HadoopWriteMethods[Png] {
def write(path: Path, gzip: Boolean, conf: Configuration): Unit =
HadoopWriteMethods.write(path, gzip, conf) { _.write(self.bytes) }
HadoopRWMethods.write(path, gzip, conf) { _.write(self.bytes) }
}

implicit class withGeoTiffHadoopWriteMethods[T <: CellGrid](val self: GeoTiff[T]) extends HadoopWriteMethods[GeoTiff[T]] {
def write(path: Path, gzip: Boolean, conf: Configuration): Unit =
HadoopWriteMethods.write(path, gzip, conf) { new GeoTiffWriter(self, _).write() }
HadoopRWMethods.write(path, gzip, conf) { new GeoTiffWriter(self, _).write() }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,50 +14,18 @@
* limitations under the License.
*/

package geotrellis.spark.raster
package geotrellis.spark.raster.io

import geotrellis.spark._
import geotrellis.raster.io.geotiff._
import geotrellis.raster.io.geotiff.reader._
import geotrellis.raster.testkit._
import geotrellis.spark._
import geotrellis.spark.testkit.TestEnvironment

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.hadoop.fs.Path
import org.scalatest._

import java.io._

object RasterHadoopReader {
def apply[T](path: Path, conf: Configuration)(dosRead: DataInputStream => T): T = {
val fs = FileSystem.get(conf)

val is = {
val factory = new CompressionCodecFactory(conf)
val codec = factory.getCodec(path)

if (codec == null) {
println(s"No codec found for $path, reading without compression.")
fs.open(path)
} else {
codec.createInputStream(fs.open(path))
}
}
try {
val dos = new DataInputStream(is)
try {
dosRead(dos)
} finally {
dos.close
}
} finally {
is.close
}
}
}

class HadoopWriterSpec extends FunSpec
with Matchers
with BeforeAndAfterAll
Expand All @@ -79,7 +47,7 @@ class HadoopWriterSpec extends FunSpec

geoTiff.write(new Path(path))

val actualTiff = RasterHadoopReader(new Path(path), sc.hadoopConfiguration) { is => GeoTiffReader.readMultiband(IOUtils.toByteArray(is)) }
val actualTiff = GeoTiffHadoopReader.readMultiband(new Path(path))
val actual = actualTiff.tile
val actualTags = actualTiff.tags

Expand Down

0 comments on commit da40aa3

Please sign in to comment.