diff --git a/api/src/main/scala/Stac.scala b/api/src/main/scala/Stac.scala new file mode 100644 index 0000000..2b6d432 --- /dev/null +++ b/api/src/main/scala/Stac.scala @@ -0,0 +1,71 @@ +package org.wikiwatershed.mmw.geoprocessing + +import cats.syntax.functor._ +import cats.syntax.nested._ +import com.azavea.stac4s._ +import com.azavea.stac4s.api.client._ +import geotrellis.proj4._ +import geotrellis.raster._ +import geotrellis.vector._ +import sttp.client3.UriContext +import sttp.client3.akkahttp._ + +import java.time.Instant +import scala.concurrent._ + +trait Stac extends Utils { + import scala.concurrent.ExecutionContext.Implicits.global + + def getStacGroupedCount(input: StacInput): Future[Map[String, Int]] = { + val uri = uri"${input.stacUri}" + + // TODO Calculate this based on where the AoI is + val targetCRS = ConusAlbers + + val aoi = parseGeometry(input.shape, LatLng, LatLng) + val reprojectedAoI = aoi.reproject(LatLng, targetCRS) + + val collectionName = StringName(input.stacCollection) + val searchFilters = SearchFilters( + collections=List(collectionName.value), + intersects=Some(aoi), + datetime=Some( + TemporalExtent( + Instant.parse(s"${input.year}-01-02T00:00:00Z"), + Instant.parse(s"${input.year + 1}-01-01T00:00:00Z"))) + ) + val limit = 100 + val assetName = "supercell".r + val withGDAL = false + val parallelMosaicEnable = true + + val backend = AkkaHttpBackend() + val client = SttpStacClient(backend, uri) + + // Create a Mosaic Raster Source from the STAC Items + val source = client + .search(searchFilters) + .take(limit) + .compileToFutureList + .map(MosaicRasterSource.fromStacItems(collectionName, _, assetName, targetCRS, withGDAL, parallelMosaicEnable)) + + source + .nested + // Clip the Raster Source to the AoI's extent + .map { rs => rs.read(reprojectedAoI.extent) } + .value + .map { _.flatten } + .map { + case Some(raster) => raster + .tile + // Mask to the AoI + .mask(reprojectedAoI.extent, reprojectedAoI) + .band(0) + .histogram + .binCounts() + .map { case (value, count) => (s"List(${value})", count.toInt) } + .toMap + case None => Map.empty + } + } +} diff --git a/api/src/main/scala/WebServer.scala b/api/src/main/scala/WebServer.scala index 129cb5a..594a36c 100644 --- a/api/src/main/scala/WebServer.scala +++ b/api/src/main/scala/WebServer.scala @@ -48,6 +48,13 @@ case class MultiInput ( operations: List[Operation] ) +case class StacInput ( + shape: GeoJSONString, + stacUri: String, + stacCollection: String, + year: Int, +) + object PostRequestProtocol extends DefaultJsonProtocol { implicit val inputFormat: RootJsonFormat[InputData] = jsonFormat10(InputData) implicit val postFormat: RootJsonFormat[PostRequest] = jsonFormat1(PostRequest) @@ -59,9 +66,11 @@ object PostRequestProtocol extends DefaultJsonProtocol { implicit val hucFormat: RootJsonFormat[HUC] = jsonFormat2(HUC) implicit val operationFormat: RootJsonFormat[Operation] = jsonFormat5(Operation) implicit val multiInputFormat: RootJsonFormat[MultiInput] = jsonFormat3(MultiInput) + + implicit val stacInputFormat: RootJsonFormat[StacInput] = jsonFormat4(StacInput) } -object WebServer extends HttpApp with App with LazyLogging with Geoprocessing with ErrorHandler { +object WebServer extends HttpApp with App with LazyLogging with Geoprocessing with Stac with ErrorHandler { import PostRequestProtocol._ @throws(classOf[InvalidOperationException]) @@ -97,6 +106,11 @@ object WebServer extends HttpApp with App with LazyLogging with Geoprocessing wi entity(as[MultiInput]) { input => complete(getMultiOperations(input)) } + } ~ + path("stac") { + entity(as[StacInput]) { input => + complete(getStacGroupedCount(input)) + } } } } diff --git a/api/src/main/scala/package.scala b/api/src/main/scala/package.scala index 4b08693..199fce0 100644 --- a/api/src/main/scala/package.scala +++ b/api/src/main/scala/package.scala @@ -1,6 +1,21 @@ package org.wikiwatershed.mmw -import geotrellis.layer._ +import cats.{Foldable, FunctorFilter, ~>} +import cats.data.NonEmptyList +import cats.effect.IO +import cats.effect.unsafe.IORuntime +import cats.syntax.foldable._ +import cats.syntax.option._ +import com.azavea.stac4s.{StacAsset, StacItem} +import geotrellis.layer.{SpatialKey, TileLayerCollection} +import geotrellis.proj4.CRS +import geotrellis.raster.effects.MosaicRasterSourceIO +import geotrellis.raster.{EmptyName, GridExtent, MosaicRasterSource, RasterSource, SourceName, StringName} +import geotrellis.raster.geotiff.GeoTiffPath +import geotrellis.stac.raster.{StacAssetRasterSource, StacItemAsset} + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.matching.Regex package object geoprocessing { type HucID = String @@ -8,4 +23,97 @@ package object geoprocessing { type RasterID = String type RasterLayer = TileLayerCollection[SpatialKey] type GeoJSONString = String + + // The following is taken from https://github.com/geotrellis/geotrellis-server/tree/feature/stac-example-simple/stac-simple-example/src/main/scala/geotrellis/example by @pomadchin + + implicit class AssetsMapOps(private val assets: Map[String, StacAsset]) extends AnyVal { + def select(selector: Regex): Option[StacAsset] = assets.find { case (k, _) => selector.findFirstIn(k).nonEmpty }.map(_._2) + } + + implicit class StacAssetOps(private val self: StacAsset) extends AnyVal { + def hrefGDAL(withGDAL: Boolean): String = if (withGDAL) s"gdal+${self.href}" else s"${GeoTiffPath.PREFIX}${self.href}" + def withGDAL(withGDAL: Boolean): StacAsset = self.copy(href = hrefGDAL(withGDAL)) + } + + implicit class RasterSourcesQueryOps[G[_]: Foldable: FunctorFilter, T <: RasterSource](private val self: G[T]) { + def attributesByName: Map[String, String] = + self.foldMap { rs => + rs.name match { + case StringName(sn) => rs.attributes.map { case (k, v) => s"$sn-$k" -> v } + case EmptyName => rs.attributes + } + } + } + + implicit class MosaicRasterSourceOps(private val self: MosaicRasterSource.type) extends AnyVal { + def instance( + sourcesList: NonEmptyList[RasterSource], + targetCRS: CRS, + sourceName: SourceName, + stacAttributes: Map[String, String] + ): MosaicRasterSource = { + val combinedExtent = sourcesList.map(_.extent).toList.reduce(_ combine _) + val minCellSize = sourcesList.map(_.cellSize).toList.maxBy(_.resolution) + val combinedGridExtent = GridExtent[Long](combinedExtent, minCellSize) + + new MosaicRasterSource { + val sources: NonEmptyList[RasterSource] = sourcesList + val crs: CRS = targetCRS + def gridExtent: GridExtent[Long] = combinedGridExtent + val name: SourceName = sourceName + + override val attributes = stacAttributes + } + } + + def instance(sourcesList: NonEmptyList[RasterSource], targetCRS: CRS, stacAttributes: Map[String, String]): MosaicRasterSource = + instance(sourcesList, targetCRS, EmptyName, stacAttributes) + + def fromStacItems(collectionName: SourceName, + items: List[StacItem], + assetName: Regex, + targetCRS: CRS, + withGDAL: Boolean, + parallelMosaicEnabled: Boolean + ): Option[RasterSource] = { + val sources = StacAssetRasterSource(items, assetName, withGDAL) + sources match { + case head :: Nil => head.some + case head :: tail => + val reprojectedSources = NonEmptyList.of(head, tail: _*).map(_.reproject(targetCRS)) + val attributes = reprojectedSources.toList.attributesByName + + val mosaicRasterSource = + if (parallelMosaicEnabled) + MosaicRasterSourceIO.instance(reprojectedSources, targetCRS, collectionName, attributes)(IORuntime.global) + else + MosaicRasterSource.instance(reprojectedSources, targetCRS, collectionName, attributes) + + mosaicRasterSource.some + case _ => None + } + } + } + + // format: off + /** + * Ugly shims: + * 1. search via Futures backend and produce futures + * 2. map into IO to compile fs2.Stream + * 3. convert it back to Future[List[T]] + */ + // format: on + implicit class FS2StreamFutureOps[A](private val self: fs2.Stream[Future, A]) extends AnyVal { + def toStreamIO: fs2.Stream[IO, A] = self.translate(λ[Future ~> IO](future => IO.fromFuture(IO(future)))) + def compileToFutureList(implicit ec: ExecutionContext): Future[List[A]] = + Future(toStreamIO.compile.toList.unsafeRunSync()(IORuntime.global)) + } + + implicit class StacAssetRasterSourceOps(private val self: StacAssetRasterSource.type) extends AnyVal { + def apply(items: List[StacItem], assetName: Regex, withGDAL: Boolean): Seq[StacAssetRasterSource] = items.flatMap { item => + item.assets + .select(assetName) + .map(itemAsset => StacAssetRasterSource(StacItemAsset(itemAsset.withGDAL(withGDAL), item))) + } + } } diff --git a/build.sbt b/build.sbt index 1ba05ba..cfda799 100644 --- a/build.sbt +++ b/build.sbt @@ -43,7 +43,13 @@ lazy val apiDependencies = Seq( geotrellisS3, geotrellisGdal, geotrellisRaster, - geotrellisVector + geotrellisVector, + geotrellisStac, + kindProjector, + stac4s, + stac4sClient, + sttp, + sttpAkkaBackend, ) ) @@ -63,9 +69,12 @@ lazy val commonSettings = Seq( "GeoSolutions" at "https://maven.geo-solutions.it/", "LT-releases" at "https://repo.locationtech.org/content/groups/releases", "OSGeo" at "https://repo.osgeo.org/repository/release/", + "jitpack" at "https://jitpack.io", "maven2" at "https://repo1.maven.org/maven2" ), + addCompilerPlugin(kindProjector cross CrossVersion.full), + assembly / assemblyShadeRules := { val shadePackage = "org.wikiwatershed.shaded" Seq( diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 6b0ca68..348db68 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -15,6 +15,15 @@ object Dependencies { val geotrellisRasterTestkit = "org.locationtech.geotrellis" %% "geotrellis-raster-testkit" % Version.geotrellis val geotrellisGdal = "org.locationtech.geotrellis" %% "geotrellis-gdal" % Version.geotrellis + val geotrellisStac = "com.azavea.geotrellis" %% "geotrellis-stac" % Version.geotrellisStac + + val kindProjector = "org.typelevel" %% "kind-projector" % Version.kindProjector cross CrossVersion.full + + val stac4s = "com.azavea.stac4s" %% "core" % Version.stac4s + val stac4sClient = "com.azavea.stac4s" %% "client" % Version.stac4s + val sttp = "com.softwaremill.sttp.client3" %% "core" % Version.sttp + val sttpAkkaBackend = "com.softwaremill.sttp.client3" %% "akka-http-backend" % Version.sttp + val pureconfig = "com.github.pureconfig" %% "pureconfig" % "0.9.1" val logging = "com.typesafe.scala-logging" %% "scala-logging" % Version.scalaLogging val scalatest = "org.scalatest" %% "scalatest" % Version.scalatest diff --git a/project/Version.scala b/project/Version.scala index bdb71c9..ff99563 100644 --- a/project/Version.scala +++ b/project/Version.scala @@ -5,11 +5,15 @@ object Version { def either(environmentVariable: String, default: String): String = Properties.envOrElse(environmentVariable, default) - val akkaHttp = "10.5.3" - val akka = "2.8.5" - val geotrellis = "3.7.0" - val scala = either("SCALA_VERSION", "2.13.12") - val scalaLogging = "3.9.5" - val scalaParallel = "1.0.4" - val scalatest = "3.2.18" + val akkaHttp = "10.5.3" + val akka = "2.8.5" + val geotrellis = "3.7.0" + val geotrellisStac = "4.6.0" + val kindProjector = "0.13.3" + val scala = either("SCALA_VERSION", "2.13.12") + val scalaLogging = "3.9.5" + val scalaParallel = "1.0.4" + val scalatest = "3.2.18" + val stac4s = "0.9.0" + val sttp = "3.9.6" } diff --git a/project/metals.sbt b/project/metals.sbt index 119c929..aaf3382 100644 --- a/project/metals.sbt +++ b/project/metals.sbt @@ -1,6 +1,8 @@ +// format: off // DO NOT EDIT! This file is auto-generated. // This file enables sbt-bloop to create bloop config files. -addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.5.15") +addSbtPlugin("ch.epfl.scala" % "sbt-bloop" % "1.5.17") +// format: on