Skip to content

Commit

Permalink
Refactored functions namings / packages, removed explicit gzip flag.
Browse files Browse the repository at this point in the history
Signed-off-by: Grigory Pomadchin <gr.pomadchin@gmail.com>
  • Loading branch information
pomadchin committed Mar 31, 2017
1 parent da84e38 commit e0eef5d
Show file tree
Hide file tree
Showing 8 changed files with 74 additions and 103 deletions.
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
package geotrellis.spark.raster.io
package geotrellis.spark.io.hadoop

import geotrellis.raster.io.geotiff.{MultibandGeoTiff, SinglebandGeoTiff}
import geotrellis.raster.io.geotiff.reader._
import geotrellis.raster.io.geotiff.{MultibandGeoTiff, SinglebandGeoTiff}
import geotrellis.vector.Extent

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext

object GeoTiffHadoopReader {
object HadoopGeoTiffReader {
def readSingleband(path: Path)(implicit sc: SparkContext): SinglebandGeoTiff = readSingleband(path, decompress = true, streaming = false, None, sc.hadoopConfiguration)
def readSingleband(path: Path, decompress: Boolean, streaming: Boolean, extent: Option[Extent], conf: Configuration): SinglebandGeoTiff = {
HadoopRWMethods.read(path, conf) { is =>
val geoTiff = GeoTiffReader.readSingleband(IOUtils.toByteArray(is), decompress, streaming)
extent match {
case Some(e) => geoTiff.crop(e)
case _ => geoTiff
}
def readSingleband(path: Path, decompress: Boolean, streaming: Boolean, extent: Option[Extent], conf: Configuration): SinglebandGeoTiff = HadoopRasterMethods.read(path, conf) { is =>
val geoTiff = GeoTiffReader.readSingleband(IOUtils.toByteArray(is), decompress, streaming)
extent match {
case Some(e) => geoTiff.crop(e)
case _ => geoTiff
}
}

def readMultiband(path: Path)(implicit sc: SparkContext): MultibandGeoTiff = readMultiband(path, decompress = true, streaming = false, None, sc.hadoopConfiguration)
def readMultiband(path: Path, decompress: Boolean, streaming: Boolean, extent: Option[Extent], conf: Configuration): MultibandGeoTiff = {
HadoopRWMethods.read(path, conf) { is =>
HadoopRasterMethods.read(path, conf) { is =>
val geoTiff = GeoTiffReader.readMultiband(IOUtils.toByteArray(is), decompress, streaming)
extent match {
case Some(e) => geoTiff.crop(e)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package geotrellis.spark.raster.io
package geotrellis.spark.io.hadoop

import geotrellis.raster.render.Jpg

Expand All @@ -7,7 +7,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext

object JpgHadoopReader {
object HadoopJpgReader {
def read(path: Path)(implicit sc: SparkContext): Jpg = read(path, sc.hadoopConfiguration)
def read(path: Path, conf: Configuration): Jpg = HadoopRWMethods.read(path, conf) { is => Jpg(IOUtils.toByteArray(is)) }
def read(path: Path, conf: Configuration): Jpg = HadoopRasterMethods.read(path, conf) { is => Jpg(IOUtils.toByteArray(is)) }
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package geotrellis.spark.raster.io
package geotrellis.spark.io.hadoop

import geotrellis.raster.render.Png

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext

object PngHadoopReader {
object HadoopPngReader {
def read(path: Path)(implicit sc: SparkContext): Png = read(path, sc.hadoopConfiguration)
def read(path: Path, conf: Configuration): Png = HadoopRWMethods.read(path, conf) { is => Png(IOUtils.toByteArray(is)) }
def read(path: Path, conf: Configuration): Png = HadoopRasterMethods.read(path, conf) { is => Png(IOUtils.toByteArray(is)) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,43 +14,37 @@
* limitations under the License.
*/

package geotrellis.spark.raster.io

import java.io.{DataInputStream, DataOutputStream}
package geotrellis.spark.io.hadoop

import geotrellis.util.MethodExtensions

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.spark.SparkContext
import java.io.{DataInputStream, DataOutputStream}


trait HadoopWriteMethods[T] extends MethodExtensions[T] {
def write(path: Path)(implicit sc: SparkContext): Unit = write(path, gzip = false)
def write(path: Path, gzip: Boolean)(implicit sc: SparkContext): Unit = write(path, gzip, sc.hadoopConfiguration)
def write(path: Path, conf: Configuration): Unit = write(path, gzip = false, conf)
def write(path: Path, gzip: Boolean, conf: Configuration): Unit
trait HadoopRasterMethods[T] extends MethodExtensions[T] {
def write(path: Path)(implicit sc: SparkContext): Unit = write(path, sc.hadoopConfiguration)
def write(path: Path, conf: Configuration): Unit
}

object HadoopRWMethods {
def write(path: Path, gzip: Boolean, conf: Configuration)(dosWrite: DataOutputStream => Unit): Unit = {
object HadoopRasterMethods {
def write(path: Path, conf: Configuration)(dosWrite: DataOutputStream => Unit): Unit = {
val fs = FileSystem.get(conf)

val os =
if (!gzip) {
val os = {
val factory = new CompressionCodecFactory(conf)
val codec = factory.getCodec(path)

if (codec == null) {
println(s"No codec found for $path, writing without compression.")
fs.create(path)
} else {
val factory = new CompressionCodecFactory(conf)
val outputUri = new Path(s"${path.toUri.toString}.gz")

val codec = factory.getCodec(outputUri)

if (codec == null) {
println(s"No codec found for $outputUri, writing without compression.")
fs.create(path)
} else {
codec.createOutputStream(fs.create(outputUri))
}
codec.createOutputStream(fs.create(path))
}
}
try {
val dos = new DataOutputStream(os)
try {
Expand Down
24 changes: 24 additions & 0 deletions spark/src/main/scala/geotrellis/spark/io/hadoop/Implicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@

package geotrellis.spark.io.hadoop

import geotrellis.raster.CellGrid
import geotrellis.raster.io.geotiff.GeoTiff
import geotrellis.raster.io.geotiff.writer.GeoTiffWriter
import geotrellis.raster.render.{Jpg, Png}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.rdd._

Expand All @@ -25,4 +32,21 @@ trait Implicits {
implicit class HadoopSparkContextMethodsWrapper(val sc: SparkContext) extends HadoopSparkContextMethods
implicit class withSaveBytesToHadoopMethods[K](rdd: RDD[(K, Array[Byte])]) extends SaveBytesToHadoopMethods[K](rdd)
implicit class withSaveToHadoopMethods[K,V](rdd: RDD[(K,V)]) extends SaveToHadoopMethods[K, V](rdd)

implicit class withJpgHadoopWriteMethods(val self: Jpg) extends HadoopRasterMethods[Jpg] {
def write(path: Path, conf: Configuration): Unit =
HadoopRasterMethods.write(path, conf) { _.write(self.bytes) }
}

implicit class withPngHadoopWriteMethods(val self: Png) extends HadoopRasterMethods[Png] {
def write(path: Path, conf: Configuration): Unit =
HadoopRasterMethods.write(path, conf) { _.write(self.bytes) }
}

implicit class withGeoTiffHadoopWriteMethods[T <: CellGrid](val self: GeoTiff[T]) extends HadoopRasterMethods[GeoTiff[T]] {
def write(path: Path, conf: Configuration): Unit =
HadoopRasterMethods.write(path, conf) { new GeoTiffWriter(self, _).write() }
}


}
4 changes: 1 addition & 3 deletions spark/src/main/scala/geotrellis/spark/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import geotrellis.spark.tiling._
import geotrellis.spark.ingest._
import geotrellis.spark.crop._
import geotrellis.spark.filter._

import org.apache.spark.{Partitioner, SparkContext}
import org.apache.spark.rdd._
import spire.syntax.cfor._
Expand Down Expand Up @@ -59,8 +58,7 @@ package object spark
with stitch.Implicits
with summary.Implicits
with summary.polygonal.Implicits
with tiling.Implicits
with spark.raster.io.Implicits {
with tiling.Implicits {
type TileLayerRDD[K] = RDD[(K, Tile)] with Metadata[TileLayerMetadata[K]]
object TileLayerRDD {
def apply[K](rdd: RDD[(K, Tile)], metadata: TileLayerMetadata[K]): TileLayerRDD[K] =
Expand Down
44 changes: 0 additions & 44 deletions spark/src/main/scala/geotrellis/spark/raster/io/Implicits.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,27 @@
* limitations under the License.
*/

package geotrellis.spark.raster.io
package geotrellis.spark.io.hadoop

import geotrellis.raster.io.geotiff._
import geotrellis.raster.testkit._
import geotrellis.spark._
import geotrellis.raster.{IntCellType, MultibandTile}
import geotrellis.spark.io.hadoop
import geotrellis.spark.testkit.TestEnvironment

import org.apache.hadoop.fs.Path
import org.scalatest._
import java.io._

import geotrellis.raster.{IntCellType, MultibandTile}
import java.io._

class HadoopWriterSpec extends FunSpec
class HadoopRasterMethodsSpec extends FunSpec
with Matchers
with BeforeAndAfterAll
with RasterMatchers
with TileBuilders
with TestEnvironment {

describe ("writing Rasters without errors and with correct tiles, crs and extent using Hadoop output stream") {
describe ("writing Rasters without errors and with correct tiles, crs and extent using Hadoop FSData{Input|Output} stream") {
def expandGeoTiff(geoTiff: MultibandGeoTiff) =
MultibandGeoTiff(
MultibandTile(
Expand All @@ -52,6 +53,7 @@ class HadoopWriterSpec extends FunSpec
)

val (pathTiff, pathPng, pathJpg) = (tempTiff.getPath, tempPng.getPath, tempJpg.getPath)
val (pathTiffGz, pathPngGz, pathJpgGz) = (s"${tempTiff.getPath}.gz", s"${tempPng.getPath}.gz", s"${tempJpg.getPath}.gz")
val existencePath = "raster-test/data/aspect.tif"

it("should write GeoTiff with tags") {
Expand All @@ -62,7 +64,7 @@ class HadoopWriterSpec extends FunSpec

geoTiff.write(new Path(pathTiff))

val actualTiff = GeoTiffHadoopReader.readMultiband(new Path(pathTiff))
val actualTiff = hadoop.HadoopGeoTiffReader.readMultiband(new Path(pathTiff))
val actual = actualTiff.tile
val actualTags = actualTiff.tags

Expand All @@ -76,9 +78,9 @@ class HadoopWriterSpec extends FunSpec
val expected = geoTiff.tile
val expectedTags = geoTiff.tags

geoTiff.write(new Path(pathTiff), true)
geoTiff.write(new Path(pathTiffGz))

val actualTiff = GeoTiffHadoopReader.readMultiband(new Path(s"$pathTiff.gz"))
val actualTiff = hadoop.HadoopGeoTiffReader.readMultiband(new Path(pathTiffGz))
val actual = actualTiff.tile
val actualTags = actualTiff.tags

Expand All @@ -92,17 +94,17 @@ class HadoopWriterSpec extends FunSpec
val expected = geoTiff.tile.convert(IntCellType).renderPng()
expected.write(new Path(pathPng))

val actual = PngHadoopReader.read(new Path(pathPng))
val actual = hadoop.HadoopPngReader.read(new Path(pathPng))

actual.bytes should be (expected.bytes)
}

it("should write Png with gzip") {
val geoTiff = expandGeoTiff(MultibandGeoTiff(existencePath))
val expected = geoTiff.tile.convert(IntCellType).renderPng()
expected.write(new Path(pathPng), true)
expected.write(new Path(pathPngGz))

val actual = PngHadoopReader.read(new Path(s"$pathPng.gz"))
val actual = hadoop.HadoopPngReader.read(new Path(pathPngGz))

actual.bytes should be (expected.bytes)
}
Expand All @@ -112,17 +114,17 @@ class HadoopWriterSpec extends FunSpec
val expected = geoTiff.tile.convert(IntCellType).renderJpg()
expected.write(new Path(pathJpg))

val actual = PngHadoopReader.read(new Path(pathJpg))
val actual = hadoop.HadoopPngReader.read(new Path(pathJpg))

actual.bytes should be (expected.bytes)
}

it("should write Jpg with gzip") {
val geoTiff = expandGeoTiff(MultibandGeoTiff(existencePath))
val expected = geoTiff.tile.convert(IntCellType).renderJpg()
expected.write(new Path(pathJpg), true)
expected.write(new Path(pathJpgGz))

val actual = PngHadoopReader.read(new Path(s"$pathJpg.gz"))
val actual = hadoop.HadoopPngReader.read(new Path(pathJpgGz))

actual.bytes should be (expected.bytes)
}
Expand Down

0 comments on commit e0eef5d

Please sign in to comment.