Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rasters write support to HDFS / S3 #2102

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright 2016 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.spark.io.hadoop

import geotrellis.raster.io.geotiff.reader._
import geotrellis.raster.io.geotiff.{MultibandGeoTiff, SinglebandGeoTiff}
import geotrellis.vector.Extent

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext

object HadoopGeoTiffReader {
def readSingleband(path: Path)(implicit sc: SparkContext): SinglebandGeoTiff = readSingleband(path, decompress = true, streaming = false, None, sc.hadoopConfiguration)
def readSingleband(path: Path, decompress: Boolean, streaming: Boolean, extent: Option[Extent], conf: Configuration): SinglebandGeoTiff =
HdfsUtils.read(path, conf) { is =>
val geoTiff = GeoTiffReader.readSingleband(IOUtils.toByteArray(is), decompress, streaming)
extent match {
case Some(e) => geoTiff.crop(e)
case _ => geoTiff
}
}

def readMultiband(path: Path)(implicit sc: SparkContext): MultibandGeoTiff = readMultiband(path, decompress = true, streaming = false, None, sc.hadoopConfiguration)
def readMultiband(path: Path, decompress: Boolean, streaming: Boolean, extent: Option[Extent], conf: Configuration): MultibandGeoTiff =
HdfsUtils.read(path, conf) { is =>
val geoTiff = GeoTiffReader.readMultiband(IOUtils.toByteArray(is), decompress, streaming)
extent match {
case Some(e) => geoTiff.crop(e)
case _ => geoTiff
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2016 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.spark.io.hadoop

import geotrellis.raster.render.Jpg

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext

object HadoopJpgReader {
def read(path: Path)(implicit sc: SparkContext): Jpg = read(path, sc.hadoopConfiguration)
def read(path: Path, conf: Configuration): Jpg = HdfsUtils.read(path, conf) { is => Jpg(IOUtils.toByteArray(is)) }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2016 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.spark.io.hadoop

import geotrellis.raster.render.Png
import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext

object HadoopPngReader {
def read(path: Path)(implicit sc: SparkContext): Png = read(path, sc.hadoopConfiguration)
def read(path: Path, conf: Configuration): Png = HdfsUtils.read(path, conf) { is => Png(IOUtils.toByteArray(is)) }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2016 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.spark.io.hadoop

import geotrellis.util.MethodExtensions

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext

trait HadoopRasterMethods[T] extends MethodExtensions[T] {
def write(path: Path)(implicit sc: SparkContext): Unit = write(path, sc.hadoopConfiguration)
def write(path: Path, conf: Configuration): Unit
}
53 changes: 53 additions & 0 deletions spark/src/main/scala/geotrellis/spark/io/hadoop/HdfsUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package geotrellis.spark.io.hadoop

import geotrellis.util.LazyLogging

import org.apache.hadoop.io.compress.CompressionCodecFactory
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.mapreduce.Job
Expand Down Expand Up @@ -230,4 +231,56 @@ object HdfsUtils extends LazyLogging {
}
}
}

def write(path: Path, conf: Configuration)(dosWrite: DataOutputStream => Unit): Unit = {
val fs = path.getFileSystem(conf)

val os = {
val factory = new CompressionCodecFactory(conf)
val codec = factory.getCodec(path)

if (codec == null) {
println(s"No codec found for $path, writing without compression.")
fs.create(path)
} else {
codec.createOutputStream(fs.create(path))
}
}
try {
val dos = new DataOutputStream(os)
try {
dosWrite(dos)
} finally {
dos.close
}
} finally {
os.close
}
}

def read[T](path: Path, conf: Configuration)(disRead: DataInputStream => T): T = {
val fs = path.getFileSystem(conf)

val is = {
val factory = new CompressionCodecFactory(conf)
val codec = factory.getCodec(path)

if (codec == null) {
println(s"No codec found for $path, reading without compression.")
fs.open(path)
} else {
codec.createInputStream(fs.open(path))
}
}
try {
val dis = new DataInputStream(is)
try {
disRead(dis)
} finally {
dis.close
}
} finally {
is.close
}
}
}
22 changes: 22 additions & 0 deletions spark/src/main/scala/geotrellis/spark/io/hadoop/Implicits.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@

package geotrellis.spark.io.hadoop

import geotrellis.raster.CellGrid
import geotrellis.raster.io.geotiff.GeoTiff
import geotrellis.raster.io.geotiff.writer.GeoTiffWriter
import geotrellis.raster.render.{Jpg, Png}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark._
import org.apache.spark.rdd._

Expand All @@ -25,4 +32,19 @@ trait Implicits {
implicit class HadoopSparkContextMethodsWrapper(val sc: SparkContext) extends HadoopSparkContextMethods
implicit class withSaveBytesToHadoopMethods[K](rdd: RDD[(K, Array[Byte])]) extends SaveBytesToHadoopMethods[K](rdd)
implicit class withSaveToHadoopMethods[K,V](rdd: RDD[(K,V)]) extends SaveToHadoopMethods[K, V](rdd)

implicit class withJpgHadoopWriteMethods(val self: Jpg) extends HadoopRasterMethods[Jpg] {
def write(path: Path, conf: Configuration): Unit =
HdfsUtils.write(path, conf) { _.write(self.bytes) }
}

implicit class withPngHadoopWriteMethods(val self: Png) extends HadoopRasterMethods[Png] {
def write(path: Path, conf: Configuration): Unit =
HdfsUtils.write(path, conf) { _.write(self.bytes) }
}

implicit class withGeoTiffHadoopWriteMethods[T <: CellGrid](val self: GeoTiff[T]) extends HadoopRasterMethods[GeoTiff[T]] {
def write(path: Path, conf: Configuration): Unit =
HdfsUtils.write(path, conf) { new GeoTiffWriter(self, _).write() }
}
}
1 change: 0 additions & 1 deletion spark/src/main/scala/geotrellis/spark/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import geotrellis.spark.tiling._
import geotrellis.spark.ingest._
import geotrellis.spark.crop._
import geotrellis.spark.filter._

import org.apache.spark.{Partitioner, SparkContext}
import org.apache.spark.rdd._
import spire.syntax.cfor._
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright 2016 Azavea
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package geotrellis.spark.io.hadoop

import geotrellis.raster.io.geotiff._
import geotrellis.raster.testkit._
import geotrellis.raster.{IntCellType, MultibandTile}
import geotrellis.spark.io.hadoop
import geotrellis.spark.testkit.TestEnvironment

import org.apache.hadoop.fs.Path
import org.scalatest._

import java.io._

class HadoopRasterMethodsSpec extends FunSpec
with Matchers
with BeforeAndAfterAll
with RasterMatchers
with TileBuilders
with TestEnvironment {

describe ("writing Rasters without errors and with correct tiles, crs and extent using Hadoop FSData{Input|Output} stream") {
def expandGeoTiff(geoTiff: MultibandGeoTiff) =
MultibandGeoTiff(
MultibandTile(
geoTiff.tile.bands ++
geoTiff.tile.bands ++
geoTiff.tile.bands
),
geoTiff.extent,
geoTiff.crs
)

val (tempTiff, tempPng, tempJpg) = (
File.createTempFile("geotiff-writer", ".tif"),
File.createTempFile("geotiff-writer", ".png"),
File.createTempFile("geotiff-writer", ".jpg")
)

val (pathTiff, pathPng, pathJpg) = (tempTiff.getPath, tempPng.getPath, tempJpg.getPath)
val (pathTiffGz, pathPngGz, pathJpgGz) = (s"${tempTiff.getPath}.gz", s"${tempPng.getPath}.gz", s"${tempJpg.getPath}.gz")
val existencePath = "raster-test/data/aspect.tif"

it("should write GeoTiff with tags") {
val geoTiff = MultibandGeoTiff(existencePath)

val expected = geoTiff.tile
val expectedTags = geoTiff.tags

geoTiff.write(new Path(pathTiff))

val actualTiff = hadoop.HadoopGeoTiffReader.readMultiband(new Path(pathTiff))
val actual = actualTiff.tile
val actualTags = actualTiff.tags

actual should be (expected)
actualTags should be (expectedTags)
}

it("should write GeoTiff with tags with gzip") {
val geoTiff = MultibandGeoTiff(existencePath)

val expected = geoTiff.tile
val expectedTags = geoTiff.tags

geoTiff.write(new Path(pathTiffGz))

val actualTiff = hadoop.HadoopGeoTiffReader.readMultiband(new Path(pathTiffGz))
val actual = actualTiff.tile
val actualTags = actualTiff.tags

actual should be (expected)
actualTags should be (expectedTags)
}

it("should write Png") {
val geoTiff = expandGeoTiff(MultibandGeoTiff(existencePath))

val expected = geoTiff.tile.convert(IntCellType).renderPng()
expected.write(new Path(pathPng))

val actual = hadoop.HadoopPngReader.read(new Path(pathPng))

actual.bytes should be (expected.bytes)
}

it("should write Png with gzip") {
val geoTiff = expandGeoTiff(MultibandGeoTiff(existencePath))
val expected = geoTiff.tile.convert(IntCellType).renderPng()
expected.write(new Path(pathPngGz))

val actual = hadoop.HadoopPngReader.read(new Path(pathPngGz))

actual.bytes should be (expected.bytes)
}

it("should write Jpg") {
val geoTiff = expandGeoTiff(MultibandGeoTiff(existencePath))
val expected = geoTiff.tile.convert(IntCellType).renderJpg()
expected.write(new Path(pathJpg))

val actual = hadoop.HadoopPngReader.read(new Path(pathJpg))

actual.bytes should be (expected.bytes)
}

it("should write Jpg with gzip") {
val geoTiff = expandGeoTiff(MultibandGeoTiff(existencePath))
val expected = geoTiff.tile.convert(IntCellType).renderJpg()
expected.write(new Path(pathJpgGz))

val actual = hadoop.HadoopPngReader.read(new Path(pathJpgGz))

actual.bytes should be (expected.bytes)
}
}
}