Skip to content

Commit

Permalink
Merge pull request #2998 from jbouffard/feature/range-reader-spi-2
Browse files Browse the repository at this point in the history
RangeReader SPI
  • Loading branch information
jbouffard authored Jun 18, 2019
2 parents c20a179 + 52689bb commit cd332e6
Show file tree
Hide file tree
Showing 52 changed files with 314 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package geotrellis.spark.store.accumulo
import geotrellis.store.accumulo._
import geotrellis.store.accumulo.conf.AccumuloConfig
import geotrellis.store.hadoop._
import geotrellis.store.hadoop.util._
import geotrellis.spark.util._
import geotrellis.spark.store._

Expand Down
4 changes: 4 additions & 0 deletions docs/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ Changelog
API Changes & Project structure changes
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

- ``geotrellis.util``, ``geotrellis.store``, ``geotrellis.store.s3``

- **New:** An SPI interface has been created for ``RangeReader``.

- ``geotrellis.etl``

- **Remove:** ``geotrellis.etl`` has been removed. ETL has been archived at https://github.com/geotrellis/etl (`#2969 <https://github.com/locationtech/geotrellis/pull/2969>`_).
Expand Down
9 changes: 9 additions & 0 deletions layers/src/main/scala/geotrellis/layers/hadoop/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package geotrellis.layers

import org.apache.hadoop.fs.Path

package object hadoop extends Implicits {
final val SCHEMES: Array[String] = Array("hdfs", "hdfs+file", "s3n", "s3a", "wasb", "wasbs")

implicit def stringToPath(path: String): Path = new Path(path)
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,14 @@

package geotrellis.spark.store.s3.testkit

import geotrellis.spark.store.s3._

import software.amazon.awssdk.http.AbortableInputStream
import software.amazon.awssdk.core.ResponseInputStream
import software.amazon.awssdk.auth.credentials.{AwsBasicCredentials, StaticCredentialsProvider}
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.regions.Region
import cats._
import cats.implicits._

import java.net.URI

object MockS3Client {
def apply() = {
def apply(): S3Client = {
val cred = AwsBasicCredentials.create("minio", "password")
val credProvider = StaticCredentialsProvider.create(cred)
S3Client.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import org.scalatest._
class S3LayerProviderSpec extends FunSpec with TestEnvironment {
val uri = new java.net.URI("s3://fake-bucket/some-prefix")
val client = MockS3Client()
lazy val getS3Client = () => MockS3Client()
S3TestUtils.cleanBucket(client, "fake-bucket")
S3ClientProducer.set(() => client)

it("construct S3AttributeStore from URI"){
val store = AttributeStore(uri)
assert(store.isInstanceOf[S3AttributeStore])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package geotrellis.spark.store.s3.util

import geotrellis.spark.store.s3.S3TestUtils
import geotrellis.store.s3.util._
import geotrellis.spark.store.s3.testkit.MockS3Client
import geotrellis.store.s3.S3ClientProducer
import geotrellis.util.RangeReader

import org.scalatest._

class S3RangeReaderProviderSpec extends FunSpec with Matchers {
val client = MockS3Client()
S3TestUtils.cleanBucket(client, "fake-bucket")
S3ClientProducer.set(() => client)

describe("S3RangeReaderProviderSpec") {
val uri = new java.net.URI("s3://fake-bucket/some-prefix")

it("should create a S3RangeReader from a URI") {
val reader = RangeReader(uri)

assert(reader.isInstanceOf[S3RangeReader])
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
geotrellis.store.s3.util.S3RangeReaderProvider
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,8 @@ class S3COGCollectionLayerProvider extends AttributeStoreProvider
// Need to use an alternative to AmazonS3URI
// https://github.com/aws/aws-sdk-java-v2/issues/860
val s3Uri = new AmazonS3URI(uri)
val prefix =
Option(s3Uri.getKey()) match {
case Some(s) => s
case None => ""
}
val prefix = Option(s3Uri.getKey()).getOrElse("")

new S3AttributeStore(bucket = s3Uri.getBucket(), prefix = prefix, getClient)
}

Expand Down
58 changes: 35 additions & 23 deletions s3/src/main/scala/geotrellis/store/s3/util/S3RangeReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,31 @@ import java.net.URI
* @return A new instance of S3RangeReader.
*/
class S3RangeReader(
request: GetObjectRequest,
client: S3Client) extends RangeReader {

val metadata: HeadObjectResponse = {
val headRequest = HeadObjectRequest.builder()
.bucket(request.bucket)
.key(request.key)
.build()
request: => GetObjectRequest,
client: S3Client
) extends RangeReader {

lazy val metadata: HeadObjectResponse = {
val headRequest =
HeadObjectRequest
.builder()
.bucket(request.bucket)
.key(request.key)
.build()

client.headObject(headRequest)
}

val totalLength: Long = metadata.contentLength
lazy val totalLength: Long = metadata.contentLength

def readClippedRange(start: Long, length: Int): Array[Byte] = {
val getRequest = GetObjectRequest.builder()
.bucket(request.bucket)
.key(request.key)
.range(s"bytes=${start}-${start + length}")
.build()
val getRequest =
GetObjectRequest
.builder()
.bucket(request.bucket)
.key(request.key)
.range(s"bytes=${start}-${start + length}")
.build()

val is = client.getObject(getRequest)
val bytes = IOUtils.toByteArray(is)
Expand All @@ -73,11 +79,14 @@ object S3RangeReader {
apply(uri, S3ClientProducer.get())

def apply(uri: URI, client: S3Client): S3RangeReader = {
val s3Uri = new AmazonS3URI(uri)
val request = GetObjectRequest.builder()
.bucket(s3Uri.getBucket())
.key(s3Uri.getKey())
.build()
lazy val s3Uri = new AmazonS3URI(uri)
lazy val request =
GetObjectRequest
.builder()
.bucket(s3Uri.getBucket())
.key(s3Uri.getKey())
.build()

apply(request, client)
}

Expand All @@ -90,10 +99,13 @@ object S3RangeReader {
* @return A new instance of S3RangeReader.
*/
def apply(bucket: String, key: String, client: S3Client): S3RangeReader = {
val request = GetObjectRequest.builder()
.bucket(bucket)
.key(key)
.build()
lazy val request =
GetObjectRequest
.builder()
.bucket(bucket)
.key(key)
.build()

apply(request, client)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package geotrellis.store.s3.util

import geotrellis.store.s3._
import geotrellis.util.RangeReaderProvider

import software.amazon.awssdk.services.s3.S3Client

import java.net.URI

class S3RangeReaderProvider extends RangeReaderProvider {
def canProcess(uri: URI): Boolean = uri.getScheme match {
case str: String => if (str.toLowerCase == "s3") true else false
case null => false
}

def rangeReader(uri: URI): S3RangeReader =
rangeReader(uri, S3ClientProducer.get())

def rangeReader(uri: URI, s3Client: S3Client): S3RangeReader = {
val s3Uri = new AmazonS3URI(uri)
val prefix = Option(s3Uri.getKey()).getOrElse("")

S3RangeReader(s3Uri.getBucket(), prefix, s3Client)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package geotrellis.spark.testkit

import geotrellis.layer._
import geotrellis.store.hadoop.HdfsUtils
import geotrellis.store.hadoop.util.HdfsUtils
import geotrellis.spark.util.SparkUtils
import geotrellis.spark.store.kryo.KryoRegistrator

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
geotrellis.spark.store.http.util.HttpRangeReaderProvider
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import geotrellis.raster.merge._
import geotrellis.raster.prototype._
import geotrellis.raster.resample.ResampleMethod
import geotrellis.store.cog.{COGLayerMetadata, ZoomRange}
import geotrellis.store.hadoop.{SerializableConfiguration, HdfsUtils}
import geotrellis.store.index.KeyIndex
import geotrellis.store.hadoop.SerializableConfiguration
import geotrellis.store.hadoop.util.HdfsUtils

import geotrellis.spark._
import geotrellis.spark.pyramid.Pyramid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package geotrellis.spark.store.hadoop
import geotrellis.raster.io.geotiff.reader.GeoTiffReader
import geotrellis.raster.io.geotiff.reader.GeoTiffReader.GeoTiffInfo
import geotrellis.raster.io.geotiff.tags.TiffTags
import geotrellis.store.hadoop.{HdfsUtils, HdfsRangeReader, SerializableConfiguration}
import geotrellis.store.hadoop.SerializableConfiguration
import geotrellis.store.hadoop.util.{HdfsUtils, HdfsRangeReader}
import geotrellis.spark.store._
import geotrellis.spark.store.hadoop._
import geotrellis.util.ByteReader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package geotrellis.spark.store.hadoop
import geotrellis.raster.io.geotiff.reader._
import geotrellis.raster.io.geotiff.{MultibandGeoTiff, SinglebandGeoTiff}
import geotrellis.vector.Extent
import geotrellis.store.hadoop.HdfsUtils
import geotrellis.store.hadoop.util.HdfsUtils

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package geotrellis.spark.store.hadoop

import geotrellis.raster.render.Jpg
import geotrellis.store.hadoop.HdfsUtils
import geotrellis.store.hadoop.util.HdfsUtils

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package geotrellis.spark.store.hadoop

import geotrellis.raster.render.Png
import geotrellis.store.hadoop.HdfsUtils
import geotrellis.store.hadoop.util.HdfsUtils

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import geotrellis.store.avro.codecs._
import geotrellis.store.index._
import geotrellis.store.hadoop.formats.FilterMapFileInputFormat
import geotrellis.store.hadoop._
import geotrellis.store.hadoop.util._
import geotrellis.spark._
import geotrellis.spark.store._
import geotrellis.spark.partition._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package geotrellis.spark.store.hadoop

import geotrellis.layer._
import geotrellis.store._
import geotrellis.store.hadoop.{HadoopAttributeStore, HadoopValueReader, HadoopCollectionLayerProvider}
import geotrellis.layer._
import geotrellis.store.hadoop.util.HdfsUtils
import geotrellis.spark._
import geotrellis.spark.store._
import geotrellis.util.UriUtils
Expand All @@ -39,16 +40,15 @@ import java.net.URI
* This Provider intentinally does not handle the `s3` scheme because the Hadoop implemintation is poor.
* That support is provided by [[HadoopAttributeStore]]
*/
class HadoopSparkLayerProvider extends HadoopCollectionLayerProvider
with LayerReaderProvider with LayerWriterProvider {
class HadoopSparkLayerProvider extends HadoopCollectionLayerProvider with LayerReaderProvider with LayerWriterProvider {

def layerReader(uri: URI, store: AttributeStore, sc: SparkContext): FilteringLayerReader[LayerId] = {
// don't need uri because HadoopLayerHeader contains full path of the layer
new HadoopLayerReader(store)(sc)
}

def layerWriter(uri: URI, store: AttributeStore): LayerWriter[LayerId] = {
val _uri = trim(uri)
val _uri = HdfsUtils.trim(uri)
val path = new Path(_uri)
val params = UriUtils.getParams(_uri)
val interval = params.getOrElse("interval", "4").toInt
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import geotrellis.store.cog.{ZoomRange, Extension}
import geotrellis.store.hadoop.conf.HadoopConfig
import geotrellis.store.hadoop._
import geotrellis.store.hadoop.cog.byteReader
import geotrellis.store.hadoop.util._
import geotrellis.store.index.Index
import geotrellis.spark.store.cog._
import geotrellis.spark.store.hadoop._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import geotrellis.store._
import geotrellis.store.cog.{COGLayerStorageMetadata, ZoomRange, _}
import geotrellis.store.cog.vrt.VRT
import geotrellis.store.cog.vrt.VRT.IndexedSimpleSource
import geotrellis.store.hadoop.{HadoopLayerHeader, HdfsUtils, SerializableConfiguration, HadoopAttributeStore}
import geotrellis.store.hadoop.{HadoopLayerHeader, SerializableConfiguration, HadoopAttributeStore}
import geotrellis.store.hadoop.cog.byteReader
import geotrellis.store.hadoop.util.HdfsUtils
import geotrellis.store.index._
import geotrellis.spark.store.cog._
import geotrellis.spark.store.hadoop._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import java.net.URI
import geotrellis.store.LayerId
import geotrellis.store.AttributeStore
import geotrellis.store.hadoop.cog.HadoopCOGCollectionLayerProvider
import geotrellis.store.hadoop.util.HdfsUtils
import geotrellis.spark.store.cog._

import org.apache.hadoop.fs.Path
import org.apache.spark.SparkContext

Expand All @@ -34,16 +36,14 @@ import org.apache.spark.SparkContext
* This Provider intentinally does not handle the `s3` scheme because the Hadoop implemintation is poor.
* That support is provided by [[HadoopAttributeStore]]
*/
class HadoopCOGSparkLayerProvider extends HadoopCOGCollectionLayerProvider
with COGLayerReaderProvider with COGLayerWriterProvider {

class HadoopCOGSparkLayerProvider extends HadoopCOGCollectionLayerProvider with COGLayerReaderProvider with COGLayerWriterProvider {
def layerReader(uri: URI, store: AttributeStore, sc: SparkContext): COGLayerReader[LayerId] = {
// don't need uri because HadoopLayerHeader contains full path of the layer
new HadoopCOGLayerReader(store)(sc)
}

def layerWriter(uri: URI, store: AttributeStore): COGLayerWriter = {
val _uri = trim(uri)
val _uri = HdfsUtils.trim(uri)
val path = new Path(_uri)
new HadoopCOGLayerWriter(path.toString, store)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package geotrellis.spark.store.hadoop.formats

import geotrellis.raster.io.geotiff.tags.TiffTags
import geotrellis.raster.io.geotiff.reader.TiffTagsReader
import geotrellis.store.hadoop.HdfsRangeReader
import geotrellis.store.hadoop.util.HdfsRangeReader
import geotrellis.spark.store.hadoop._
import geotrellis.util.StreamingByteReader

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package geotrellis.spark.store.hadoop.geotiff

import geotrellis.store.hadoop.{HdfsUtils, HdfsRangeReader}
import geotrellis.store.hadoop.util.{HdfsUtils, HdfsRangeReader}
import geotrellis.raster.io.geotiff.reader.TiffTagsReader
import geotrellis.util.annotations.experimental

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package geotrellis.spark.store.hadoop.geotiff

import geotrellis.store.hadoop.HdfsUtils
import geotrellis.store.hadoop.util.HdfsUtils
import geotrellis.spark.store.hadoop._
import geotrellis.util.annotations.experimental

Expand Down
Loading

0 comments on commit cd332e6

Please sign in to comment.