Skip to content

Commit

Permalink
Merge pull request #572 from s22s/feature/slippy
Browse files Browse the repository at this point in the history
Slippy map writing support.
  • Loading branch information
metasim authored Nov 29, 2021
2 parents 8d901b7 + c7183ce commit 44f4072
Show file tree
Hide file tree
Showing 12 changed files with 693 additions and 51 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ lazy val docs = project
Compile / paradoxMaterialTheme ~= { _
.withRepository(uri("https://github.com/locationtech/rasterframes"))
.withCustomStylesheet("assets/custom.css")
.withCopyright("""&copy; 2017-2019 <a href="https://astraea.earth">Astraea</a>, Inc. All rights reserved.""")
.withCopyright("""&copy; 2017-2021 <a href="https://astraea.earth">Astraea</a>, Inc. All rights reserved.""")
.withLogo("assets/images/RF-R.svg")
.withFavicon("assets/images/RasterFrames_32x32.ico")
.withColor("blue-grey", "light-blue")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Column, Row, TypedColumn}

class ProjectedLayerMetadataAggregate(destCRS: CRS, destDims: Dimensions[Int]) extends UserDefinedAggregateFunction {

import ProjectedLayerMetadataAggregate._

def inputSchema: StructType = InputRecord.inputRecordEncoder.schema
Expand All @@ -47,10 +48,10 @@ class ProjectedLayerMetadataAggregate(destCRS: CRS, destDims: Dimensions[Int]) e
def initialize(buffer: MutableAggregationBuffer): Unit = ()

def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if(!input.isNullAt(0)) {
if (!input.isNullAt(0)) {
val in = input.as[InputRecord]

if(buffer.isNullAt(0)) {
if (buffer.isNullAt(0)) {
in.toBufferRecord(destCRS).write(buffer)
} else {
val br = buffer.as[BufferRecord]
Expand All @@ -71,16 +72,15 @@ class ProjectedLayerMetadataAggregate(destCRS: CRS, destDims: Dimensions[Int]) e
case _ => ()
}

def evaluate(buffer: Row): Any = {
val buf = buffer.as[BufferRecord]
if (buf.isEmpty) throw new IllegalArgumentException("Can not collect metadata from empty data frame.")
def evaluate(buffer: Row): Any =
Option(buffer).map(_.as[BufferRecord]).filter(!_.isEmpty).map(buf => {
val re = RasterExtent(buf.extent, buf.cellSize)
val layout = LayoutDefinition(re, destDims.cols, destDims.rows)

val re = RasterExtent(buf.extent, buf.cellSize)
val layout = LayoutDefinition(re, destDims.cols, destDims.rows)
val kb = KeyBounds(layout.mapTransform(buf.extent))
TileLayerMetadata(buf.cellType, layout, buf.extent, destCRS, kb).toRow

val kb = KeyBounds(layout.mapTransform(buf.extent))
TileLayerMetadata(buf.cellType, layout, buf.extent, destCRS, kb).toRow
}
}).getOrElse(throw new IllegalArgumentException("Can not collect metadata from empty data frame."))
}

object ProjectedLayerMetadataAggregate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ org.locationtech.rasterframes.datasource.raster.RasterSourceDataSource
org.locationtech.rasterframes.datasource.geojson.GeoJsonDataSource
org.locationtech.rasterframes.datasource.stac.api.StacApiDataSource
org.locationtech.rasterframes.datasource.tiles.TilesDataSource
org.locationtech.rasterframes.datasource.slippy.SlippyDataSource
77 changes: 77 additions & 0 deletions datasource/src/main/resources/slippy.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<!DOCTYPE html>
<!--
~ This software is licensed under the Apache 2 license, quoted below.
~
~ Copyright 2021 Astraea. Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License"); you may not
~ use this file except in compliance with the License. You may obtain a copy of
~ the License at
~
~ [http://www.apache.org/licenses/LICENSE-2.0]
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
~ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
~ License for the specific language governing permissions and limitations under
~ the License.
~
~
-->

<html lang="en">
<head>
<title>RasterFrames Rendering</title>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1.0"/>
<meta property="eai:center" content="(${viewLat},${viewLon})"/>
<meta property="eai:maxZoom" content="${maxNativeZoom}"/>
<link rel="stylesheet" href="https://unpkg.com/leaflet@1.3.1/dist/leaflet.css" integrity="sha512-Rksm5RenBEKSKFjgI3a41vrjkw4EVPlJ3+OiI65vTjIdo9brlAacEuKOiQ5OFh7cOI1bkDwLqdLw3Zg0cRJAAQ==" crossorigin=""/>
<script src="https://unpkg.com/leaflet@1.3.1/dist/leaflet.js" integrity="sha512-/Nsx9X4HebavoBvEBuyp3I7od5tA0UzAxs+j83KgC8PU0kgB4XiK4Lfe4y4cgBtaRJQEIFCW+oC506aPT2L1zw==" crossorigin=""></script>
<link rel="stylesheet" href="https://unpkg.com/leaflet-control-geocoder/dist/Control.Geocoder.css" />
<script src="https://unpkg.com/leaflet-control-geocoder/dist/Control.Geocoder.js"></script>
<style>
#mapid {
position: absolute;
top: 10px;
bottom: 10px;
left: 10px;
right: 10px;
}
</style>
</head>
<body>

<div id="mapid"></div>

<script>

var map = L.map('mapid')
.setView([${viewLat}, ${viewLon}], ${maxNativeZoom});

L.tileLayer('https://{s}.tile.openstreetmap.org/{z}/{x}/{y}.png', {attribution: '&copy; <a href="https://www.openstreetmap.org/copyright">OpenStreetMap</a> contributors'}).addTo(map);

L.tileLayer(
'{z}/{x}/{y}.png', {
maxZoom: 18,
maxNativeZoom: ${maxNativeZoom}
}
).addTo(map);

L.control.scale().addTo(map);

L.Control.geocoder().addTo(map);

var popup = L.popup();

function showPos(e) {
popup
.setLatLng(e.latlng)
.setContent(e.latlng.toString())
.openOn(map);
}

map.on('click', showPos);
</script>
</body>
</html>
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package org.locationtech.rasterframes
import cats.syntax.option._
import io.circe.Json
import io.circe.parser
import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import sttp.model.Uri

Expand Down Expand Up @@ -72,4 +73,48 @@ package object datasource {
def jsonParam(key: String, parameters: CaseInsensitiveStringMap): Option[Json] =
if(parameters.containsKey(key)) parser.parse(parameters.get(key)).toOption
else None


/**
* Convenience grouping for transient columns defining spatial context.
*/
private[rasterframes]
case class SpatialComponents(crsColumn: Column,
extentColumn: Column,
dimensionColumn: Column,
cellTypeColumn: Column)

private[rasterframes]
object SpatialComponents {
def apply(tileColumn: Column, crsColumn: Column, extentColumn: Column): SpatialComponents = {
val dim = rf_dimensions(tileColumn) as "dims"
val ct = rf_cell_type(tileColumn) as "cellType"
SpatialComponents(crsColumn, extentColumn, dim, ct)
}
def apply(prColumn : Column): SpatialComponents = {
SpatialComponents(
rf_crs(prColumn) as "crs",
rf_extent(prColumn) as "extent",
rf_dimensions(prColumn) as "dims",
rf_cell_type(prColumn) as "cellType"
)
}
}

/**
* If the given DataFrame has extent and CRS columns return the DataFrame, the CRS column an extent column.
* Otherwise, see if there's a `ProjectedRaster` column add `crs` and `extent` columns extracted from the
* `ProjectedRaster` column to the returned DataFrame.
*
* @param d DataFrame to process.
* @return Tuple containing the updated DataFrame followed by the CRS column and the extent column
*/
private[rasterframes]
def projectSpatialComponents(d: DataFrame): Option[SpatialComponents] =
d.tileColumns.headOption.zip(d.crsColumns.headOption.zip(d.extentColumns.headOption)).headOption
.map { case (tile, (crs, extent)) => SpatialComponents(tile, crs, extent) }
.orElse(
d.projRasterColumns.headOption
.map(pr => SpatialComponents(pr))
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright (c) 2020 Astraea, Inc. All right reserved.
*/

package org.locationtech.rasterframes.datasource.slippy

import geotrellis.layer.{SpatialKey, TileLayerMetadata, ZoomedLayoutScheme}
import geotrellis.proj4.{LatLng, WebMercator}
import geotrellis.raster._
import geotrellis.raster.render.ColorRamp
import geotrellis.raster.resample.Bilinear
import geotrellis.spark._
import geotrellis.spark.pyramid.Pyramid
import geotrellis.spark.store.slippy.HadoopSlippyTileWriter
import geotrellis.vector.reproject.Implicits._
import org.apache.commons.text.StringSubstitutor
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.locationtech.rasterframes.encoders.StandardEncoders
import org.locationtech.rasterframes.expressions.aggregates.ProjectedLayerMetadataAggregate
import org.locationtech.rasterframes.util.withResource
import org.locationtech.rasterframes.{rf_agg_approx_histogram, _}
import org.locationtech.rasterframes.datasource._

import java.io.PrintStream
import java.net.URI
import java.nio.file.Paths
import scala.io.Source
import RenderingProfiles._
import org.locationtech.rasterframes.datasource.slippy.RenderingModes.{RenderingMode, Uniform}

object DataFrameSlippyExport extends StandardEncoders {
val destCRS = WebMercator

/**
* Export tiles as a slippy map.
* NB: Temporal components are ignored blindly.
*
* @param dest URI for Hadoop supported storage endpoint (e.g. 'file://', 'hdfs://', etc.).
* @param profile Rendering profile
*/
def writeSlippyTiles(df: DataFrame, dest: URI, profile: Profile): SlippyResult = {

val spark = df.sparkSession
implicit val sc = spark.sparkContext

val outputPath: String = dest.toASCIIString

require(
df.tileColumns.length >= profile.expectedBands, // TODO: Do we want to allow this greater than case? Warn the user?
s"Selected rendering mode '${profile}' expected ${profile.expectedBands} bands.")

// select only the tile columns given by user and crs, extent columns which are fallback if first `column` is not a PRT
val SpatialComponents(crs, extent, dims, cellType) = projectSpatialComponents(df)
.getOrElse(
throw new IllegalArgumentException("Provided dataframe did not have an Extent and/or CRS"))

val tlm: TileLayerMetadata[SpatialKey] =
df.select(
ProjectedLayerMetadataAggregate(
destCRS,
extent,
crs,
cellType,
dims
)
)
.first()

val rfLayer = df
.toLayer(tlm)
// TODO: this should be fixed in RasterFrames
.na
.drop()
.persist()
.asInstanceOf[RasterFrameLayer]

val inputRDD: MultibandTileLayerRDD[SpatialKey] =
rfLayer.toMultibandTileLayerRDD match {
case Left(spatial) => spatial
case Right(_) =>
throw new NotImplementedError(
"Dataframes with multiple temporal values are not yet supported.")
}

val tileColumns = rfLayer.tileColumns

val rp = profile match {
case up: UniformColorRampProfile =>
val hist = rfLayer
.select(rf_agg_approx_histogram(tileColumns.head))
.first()
up.toResolvedProfile(hist)
case up: UniformRGBColorProfile =>
require(tileColumns.length >= 3)
val stats = rfLayer
.select(
rf_agg_stats(tileColumns(0)),
rf_agg_stats(tileColumns(1)),
rf_agg_stats(tileColumns(2)))
.first()
up.toResolvedProfile(stats._1, stats._2, stats._3)
case o => o
}

val layoutScheme = ZoomedLayoutScheme(WebMercator, tileSize = 256)

val (zoom, reprojected) = inputRDD.reproject(WebMercator, layoutScheme, Bilinear)
val renderer = (_: SpatialKey, tile: MultibandTile) => rp.render(tile).bytes
val writer = new HadoopSlippyTileWriter[MultibandTile](outputPath, "png")(renderer)

// Pyramiding up the zoom levels, write our tiles out to the local file system.
Pyramid.upLevels(reprojected, layoutScheme, zoom, Bilinear) { (rdd, z) =>
writer.write(z, rdd)
}

rfLayer.unpersist()

val center = reprojected.metadata.extent.center
.reproject(WebMercator, LatLng)

SlippyResult(dest, center.getY, center.getX, zoom)
}

def writeSlippyTiles(df: DataFrame, dest: URI, renderingMode: RenderingMode): SlippyResult = {

val profile = (df.tileColumns.length, renderingMode) match {
case (1, Uniform) => UniformColorRampProfile(greyscale)
case (_, Uniform) => UniformRGBColorProfile()
case (1, _) => ColorRampProfile(greyscale)
case _ => RGBColorProfile()
}
writeSlippyTiles(df, dest, profile)
}

def writeSlippyTiles(df: DataFrame, dest: URI, colorRamp: ColorRamp, renderingMode: RenderingMode): SlippyResult = {
val profile = renderingMode match {
case Uniform UniformColorRampProfile(colorRamp)
case _ ColorRampProfile(colorRamp)
}
writeSlippyTiles(df, dest, profile)
}

case class SlippyResult(dest: URI, centerLat: Double, centerLon: Double, maxZoom: Int) {
// for python interop
def outputUrl(): String = dest.toASCIIString

def writeHtml(spark: SparkSession): Unit = {
import java.util.{HashMap => JMap}

val subst = new StringSubstitutor(new JMap[String, String]() {
put("maxNativeZoom", maxZoom.toString)
put("id", Paths.get(dest.getPath).getFileName.toString)
put("viewLat", centerLat.toString)
put("viewLon", centerLon.toString)
})

val rawLines = Source.fromInputStream(getClass.getResourceAsStream("/slippy.html")).getLines()

val fs = FileSystem.get(dest, spark.sparkContext.hadoopConfiguration)

withResource(fs.create(new Path(new Path(dest), "index.html"), true)) { hout =>
val out = new PrintStream(hout, true, "UTF-8")
for (line <- rawLines) {
out.println(subst.replace(line))
}
}
}
}
}
Loading

0 comments on commit 44f4072

Please sign in to comment.