Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add STAC Summary Endpoint for Global 10m LULC Data #117

Open
wants to merge 13 commits into
base: develop
Choose a base branch
from
71 changes: 71 additions & 0 deletions api/src/main/scala/Stac.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package org.wikiwatershed.mmw.geoprocessing

import cats.syntax.functor._
import cats.syntax.nested._
import com.azavea.stac4s._
import com.azavea.stac4s.api.client._
import geotrellis.proj4._
import geotrellis.raster._
import geotrellis.vector._
import sttp.client3.UriContext
import sttp.client3.akkahttp._

import java.time.Instant
import scala.concurrent._

trait Stac extends Utils {
import scala.concurrent.ExecutionContext.Implicits.global

def getStacGroupedCount(input: StacInput): Future[Map[String, Int]] = {
val uri = uri"${input.stacUri}"

// TODO Calculate this based on where the AoI is
val targetCRS = ConusAlbers

val aoi = parseGeometry(input.shape, LatLng, LatLng)
val reprojectedAoI = aoi.reproject(LatLng, targetCRS)

val collectionName = StringName(input.stacCollection)
val searchFilters = SearchFilters(
collections=List(collectionName.value),
intersects=Some(aoi),
datetime=Some(
TemporalExtent(
Instant.parse(s"${input.year}-01-02T00:00:00Z"),
Instant.parse(s"${input.year + 1}-01-01T00:00:00Z")))
)
val limit = 100
val assetName = "supercell".r
val withGDAL = false
val parallelMosaicEnable = true

val backend = AkkaHttpBackend()
val client = SttpStacClient(backend, uri)

// Create a Mosaic Raster Source from the STAC Items
val source = client
.search(searchFilters)
.take(limit)
.compileToFutureList
.map(MosaicRasterSource.fromStacItems(collectionName, _, assetName, targetCRS, withGDAL, parallelMosaicEnable))

source
.nested
// Clip the Raster Source to the AoI's extent
.map { rs => rs.read(reprojectedAoI.extent) }
.value
.map { _.flatten }
.map {
case Some(raster) => raster
.tile
// Mask to the AoI
.mask(reprojectedAoI.extent, reprojectedAoI)
.band(0)
.histogram
.binCounts()
.map { case (value, count) => (s"List(${value})", count.toInt) }
.toMap
case None => Map.empty
}
}
}
16 changes: 15 additions & 1 deletion api/src/main/scala/WebServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,13 @@ case class MultiInput (
operations: List[Operation]
)

case class StacInput (
shape: GeoJSONString,
stacUri: String,
stacCollection: String,
year: Int,
)

object PostRequestProtocol extends DefaultJsonProtocol {
implicit val inputFormat: RootJsonFormat[InputData] = jsonFormat10(InputData)
implicit val postFormat: RootJsonFormat[PostRequest] = jsonFormat1(PostRequest)
Expand All @@ -59,9 +66,11 @@ object PostRequestProtocol extends DefaultJsonProtocol {
implicit val hucFormat: RootJsonFormat[HUC] = jsonFormat2(HUC)
implicit val operationFormat: RootJsonFormat[Operation] = jsonFormat5(Operation)
implicit val multiInputFormat: RootJsonFormat[MultiInput] = jsonFormat3(MultiInput)

implicit val stacInputFormat: RootJsonFormat[StacInput] = jsonFormat4(StacInput)
}

object WebServer extends HttpApp with App with LazyLogging with Geoprocessing with ErrorHandler {
object WebServer extends HttpApp with App with LazyLogging with Geoprocessing with Stac with ErrorHandler {
import PostRequestProtocol._

@throws(classOf[InvalidOperationException])
Expand Down Expand Up @@ -97,6 +106,11 @@ object WebServer extends HttpApp with App with LazyLogging with Geoprocessing wi
entity(as[MultiInput]) { input =>
complete(getMultiOperations(input))
}
} ~
path("stac") {
entity(as[StacInput]) { input =>
complete(getStacGroupedCount(input))
}
}
}
}
Expand Down
110 changes: 109 additions & 1 deletion api/src/main/scala/package.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,119 @@
package org.wikiwatershed.mmw

import geotrellis.layer._
import cats.{Foldable, FunctorFilter, ~>}
import cats.data.NonEmptyList
import cats.effect.IO
import cats.effect.unsafe.IORuntime
import cats.syntax.foldable._
import cats.syntax.option._
import com.azavea.stac4s.{StacAsset, StacItem}
import geotrellis.layer.{SpatialKey, TileLayerCollection}
import geotrellis.proj4.CRS
import geotrellis.raster.effects.MosaicRasterSourceIO
import geotrellis.raster.{EmptyName, GridExtent, MosaicRasterSource, RasterSource, SourceName, StringName}
import geotrellis.raster.geotiff.GeoTiffPath
import geotrellis.stac.raster.{StacAssetRasterSource, StacItemAsset}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.matching.Regex

package object geoprocessing {
type HucID = String
type OperationID = String
type RasterID = String
type RasterLayer = TileLayerCollection[SpatialKey]
type GeoJSONString = String

// The following is taken from https://github.com/geotrellis/geotrellis-server/tree/feature/stac-example-simple/stac-simple-example/src/main/scala/geotrellis/example by @pomadchin

implicit class AssetsMapOps(private val assets: Map[String, StacAsset]) extends AnyVal {
def select(selector: Regex): Option[StacAsset] = assets.find { case (k, _) => selector.findFirstIn(k).nonEmpty }.map(_._2)
}

implicit class StacAssetOps(private val self: StacAsset) extends AnyVal {
def hrefGDAL(withGDAL: Boolean): String = if (withGDAL) s"gdal+${self.href}" else s"${GeoTiffPath.PREFIX}${self.href}"
def withGDAL(withGDAL: Boolean): StacAsset = self.copy(href = hrefGDAL(withGDAL))
}

implicit class RasterSourcesQueryOps[G[_]: Foldable: FunctorFilter, T <: RasterSource](private val self: G[T]) {
def attributesByName: Map[String, String] =
self.foldMap { rs =>
rs.name match {
case StringName(sn) => rs.attributes.map { case (k, v) => s"$sn-$k" -> v }
case EmptyName => rs.attributes
}
}
}

implicit class MosaicRasterSourceOps(private val self: MosaicRasterSource.type) extends AnyVal {
def instance(
sourcesList: NonEmptyList[RasterSource],
targetCRS: CRS,
sourceName: SourceName,
stacAttributes: Map[String, String]
): MosaicRasterSource = {
val combinedExtent = sourcesList.map(_.extent).toList.reduce(_ combine _)
val minCellSize = sourcesList.map(_.cellSize).toList.maxBy(_.resolution)
val combinedGridExtent = GridExtent[Long](combinedExtent, minCellSize)

new MosaicRasterSource {
val sources: NonEmptyList[RasterSource] = sourcesList
val crs: CRS = targetCRS
def gridExtent: GridExtent[Long] = combinedGridExtent
val name: SourceName = sourceName

override val attributes = stacAttributes
}
}

def instance(sourcesList: NonEmptyList[RasterSource], targetCRS: CRS, stacAttributes: Map[String, String]): MosaicRasterSource =
instance(sourcesList, targetCRS, EmptyName, stacAttributes)

def fromStacItems(collectionName: SourceName,
items: List[StacItem],
assetName: Regex,
targetCRS: CRS,
withGDAL: Boolean,
parallelMosaicEnabled: Boolean
): Option[RasterSource] = {
val sources = StacAssetRasterSource(items, assetName, withGDAL)
sources match {
case head :: Nil => head.some
case head :: tail =>
val reprojectedSources = NonEmptyList.of(head, tail: _*).map(_.reproject(targetCRS))
val attributes = reprojectedSources.toList.attributesByName

val mosaicRasterSource =
if (parallelMosaicEnabled)
MosaicRasterSourceIO.instance(reprojectedSources, targetCRS, collectionName, attributes)(IORuntime.global)
else
MosaicRasterSource.instance(reprojectedSources, targetCRS, collectionName, attributes)

mosaicRasterSource.some
case _ => None
}
}
}

// format: off
/**
* Ugly shims:
* 1. search via Futures backend and produce futures
* 2. map into IO to compile fs2.Stream
* 3. convert it back to Future[List[T]]
*/
// format: on
implicit class FS2StreamFutureOps[A](private val self: fs2.Stream[Future, A]) extends AnyVal {
def toStreamIO: fs2.Stream[IO, A] = self.translate(λ[Future ~> IO](future => IO.fromFuture(IO(future))))
def compileToFutureList(implicit ec: ExecutionContext): Future[List[A]] =
Future(toStreamIO.compile.toList.unsafeRunSync()(IORuntime.global))
}

implicit class StacAssetRasterSourceOps(private val self: StacAssetRasterSource.type) extends AnyVal {
def apply(items: List[StacItem], assetName: Regex, withGDAL: Boolean): Seq[StacAssetRasterSource] = items.flatMap { item =>
item.assets
.select(assetName)
.map(itemAsset => StacAssetRasterSource(StacItemAsset(itemAsset.withGDAL(withGDAL), item)))
}
}
}
11 changes: 10 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@ lazy val apiDependencies = Seq(
geotrellisS3,
geotrellisGdal,
geotrellisRaster,
geotrellisVector
geotrellisVector,
geotrellisStac,
kindProjector,
stac4s,
stac4sClient,
sttp,
sttpAkkaBackend,
)
)

Expand All @@ -63,9 +69,12 @@ lazy val commonSettings = Seq(
"GeoSolutions" at "https://maven.geo-solutions.it/",
"LT-releases" at "https://repo.locationtech.org/content/groups/releases",
"OSGeo" at "https://repo.osgeo.org/repository/release/",
"jitpack" at "https://jitpack.io",
"maven2" at "https://repo1.maven.org/maven2"
),

addCompilerPlugin(kindProjector cross CrossVersion.full),

assembly / assemblyShadeRules := {
val shadePackage = "org.wikiwatershed.shaded"
Seq(
Expand Down
9 changes: 9 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@ object Dependencies {
val geotrellisRasterTestkit = "org.locationtech.geotrellis" %% "geotrellis-raster-testkit" % Version.geotrellis
val geotrellisGdal = "org.locationtech.geotrellis" %% "geotrellis-gdal" % Version.geotrellis

val geotrellisStac = "com.azavea.geotrellis" %% "geotrellis-stac" % Version.geotrellisStac

val kindProjector = "org.typelevel" %% "kind-projector" % Version.kindProjector cross CrossVersion.full

val stac4s = "com.azavea.stac4s" %% "core" % Version.stac4s
val stac4sClient = "com.azavea.stac4s" %% "client" % Version.stac4s
val sttp = "com.softwaremill.sttp.client3" %% "core" % Version.sttp
val sttpAkkaBackend = "com.softwaremill.sttp.client3" %% "akka-http-backend" % Version.sttp

val pureconfig = "com.github.pureconfig" %% "pureconfig" % "0.9.1"
val logging = "com.typesafe.scala-logging" %% "scala-logging" % Version.scalaLogging
val scalatest = "org.scalatest" %% "scalatest" % Version.scalatest
Expand Down
18 changes: 11 additions & 7 deletions project/Version.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ object Version {
def either(environmentVariable: String, default: String): String =
Properties.envOrElse(environmentVariable, default)

val akkaHttp = "10.5.3"
val akka = "2.8.5"
val geotrellis = "3.7.0"
val scala = either("SCALA_VERSION", "2.13.12")
val scalaLogging = "3.9.5"
val scalaParallel = "1.0.4"
val scalatest = "3.2.18"
val akkaHttp = "10.5.3"
val akka = "2.8.5"
val geotrellis = "3.7.0"
val geotrellisStac = "4.6.0"
val kindProjector = "0.13.3"
val scala = either("SCALA_VERSION", "2.13.12")
val scalaLogging = "3.9.5"
val scalaParallel = "1.0.4"
val scalatest = "3.2.18"
val stac4s = "0.9.0"
val sttp = "3.9.6"
}
4 changes: 3 additions & 1 deletion project/metals.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// format: off
// DO NOT EDIT! This file is auto-generated.

// This file enables sbt-bloop to create bloop config files.

addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.5.15")
addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.5.17")

// format: on
Loading