Skip to content

Commit

Permalink
DRY
Browse files Browse the repository at this point in the history
  • Loading branch information
bossie committed Oct 30, 2024
1 parent bb8871a commit c484df2
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 81 deletions.
45 changes: 45 additions & 0 deletions openeogeotrellis/collections/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import datetime as dt
from typing import Callable

import geopyspark
from py4j.java_gateway import JavaObject


def convert_scala_metadata(
metadata_sc: JavaObject, epoch_ms_to_datetime: Callable[[int], dt.datetime], logger
) -> geopyspark.Metadata:
"""
Convert geotrellis TileLayerMetadata (Java) object to geopyspark Metadata object
"""
logger.info("Convert {m!r} to geopyspark.Metadata".format(m=metadata_sc))
crs_py = str(metadata_sc.crs())
cell_type_py = str(metadata_sc.cellType())

def convert_key(key_sc: JavaObject) -> geopyspark.SpaceTimeKey:
return geopyspark.SpaceTimeKey(
col=key_sc.col(), row=key_sc.row(), instant=epoch_ms_to_datetime(key_sc.instant())
)

bounds_sc = metadata_sc.bounds()
bounds_py = geopyspark.Bounds(minKey=convert_key(bounds_sc.minKey()), maxKey=convert_key(bounds_sc.maxKey()))

def convert_extent(extent_sc: JavaObject) -> geopyspark.Extent:
return geopyspark.Extent(extent_sc.xmin(), extent_sc.ymin(), extent_sc.xmax(), extent_sc.ymax())

extent_py = convert_extent(metadata_sc.extent())

layout_definition_sc = metadata_sc.layout()
tile_layout_sc = layout_definition_sc.tileLayout()
tile_layout_py = geopyspark.TileLayout(
layoutCols=tile_layout_sc.layoutCols(),
layoutRows=tile_layout_sc.layoutRows(),
tileCols=tile_layout_sc.tileCols(),
tileRows=tile_layout_sc.tileRows(),
)
layout_definition_py = geopyspark.LayoutDefinition(
extent=convert_extent(layout_definition_sc.extent()), tileLayout=tile_layout_py
)

return geopyspark.Metadata(
bounds=bounds_py, crs=crs_py, cell_type=cell_type_py, extent=extent_py, layout_definition=layout_definition_py
)
46 changes: 6 additions & 40 deletions openeogeotrellis/collections/s1backscatter_orfeo.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from openeo_driver.datastructs import SarBackscatterArgs
from openeo_driver.errors import OpenEOApiException, FeatureUnsupportedException
from openeo_driver.utils import smart_bool

from openeogeotrellis.collections import convert_scala_metadata
from openeogeotrellis.config import get_backend_config
from openeogeotrellis.utils import lonlat_to_mercator_tile_indices, nullcontext, get_jvm, set_max_memory, \
ensure_executor_logging
Expand Down Expand Up @@ -157,48 +159,12 @@ def _build_feature_rdd(
)
feature_pyrdd, layer_metadata_sc = self._load_feature_rdd(
file_rdd_factory, projected_polygons=projected_polygons, from_date=from_date, to_date=to_date,
zoom=zoom, tile_size=tile_size,datacubeParams=datacubeParams
)
layer_metadata_py = self._convert_scala_metadata(layer_metadata_sc)
return feature_pyrdd, layer_metadata_py

def _convert_scala_metadata(self, metadata_sc: JavaObject) -> geopyspark.Metadata:
"""
Convert geotrellis TileLayerMetadata (Java) object to geopyspark Metadata object
"""
logger.info("Convert {m!r} to geopyspark.Metadata".format(m=metadata_sc))
crs_py = str(metadata_sc.crs())
cell_type_py = str(metadata_sc.cellType())

def convert_key(key_sc: JavaObject) -> geopyspark.SpaceTimeKey:
return geopyspark.SpaceTimeKey(
col=key_sc.col(), row=key_sc.row(),
instant=_instant_ms_to_day(key_sc.instant())
)

bounds_sc = metadata_sc.bounds()
bounds_py = geopyspark.Bounds(minKey=convert_key(bounds_sc.minKey()), maxKey=convert_key(bounds_sc.maxKey()))

def convert_extent(extent_sc: JavaObject) -> geopyspark.Extent:
return geopyspark.Extent(extent_sc.xmin(), extent_sc.ymin(), extent_sc.xmax(), extent_sc.ymax())

extent_py = convert_extent(metadata_sc.extent())

layout_definition_sc = metadata_sc.layout()
tile_layout_sc = layout_definition_sc.tileLayout()
tile_layout_py = geopyspark.TileLayout(
layoutCols=tile_layout_sc.layoutCols(), layoutRows=tile_layout_sc.layoutRows(),
tileCols=tile_layout_sc.tileCols(), tileRows=tile_layout_sc.tileRows()
zoom=zoom, tile_size=tile_size, datacubeParams=datacubeParams
)
layout_definition_py = geopyspark.LayoutDefinition(
extent=convert_extent(layout_definition_sc.extent()),
tileLayout=tile_layout_py
)

return geopyspark.Metadata(
bounds=bounds_py, crs=crs_py, cell_type=cell_type_py,
extent=extent_py, layout_definition=layout_definition_py
layer_metadata_py = convert_scala_metadata(
layer_metadata_sc, epoch_ms_to_datetime=_instant_ms_to_day, logger=logger
)
return feature_pyrdd, layer_metadata_py

# Mapping of `sar_backscatter` coefficient value to `SARCalibration` Lookup table value
_coefficient_mapping = {
Expand Down
43 changes: 2 additions & 41 deletions openeogeotrellis/collections/sentinel3.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from pyspark import SparkContext, find_spark_home
from scipy.spatial import cKDTree # used for tuning the griddata interpolation settings

from openeogeotrellis.collections import convert_scala_metadata
from openeogeotrellis.utils import ensure_executor_logging, get_jvm, set_max_memory

OLCI_PRODUCT_TYPE = "OL_1_EFR___"
Expand Down Expand Up @@ -120,7 +121,7 @@ def pyramid(metadata_properties, projected_polygons_native_crs, from_date, to_da
pyrdd = geopyspark.create_python_rdd(j2p_rdd, serializer=serializer)
pyrdd = pyrdd.map(json.loads)

layer_metadata_py = _convert_scala_metadata(metadata_sc)
layer_metadata_py = convert_scala_metadata(metadata_sc, epoch_ms_to_datetime=_instant_ms_to_hour, logger=logger)

# -----------------------------------
prefix = ""
Expand Down Expand Up @@ -696,46 +697,6 @@ def apply_LUT_on_band(in_data, LUT, nodata=None):
return grid_values


# TODO: reduce code duplication with S1BackscatterOrfeo._convert_scala_metadata
def _convert_scala_metadata(metadata_sc: JavaObject) -> geopyspark.Metadata:
"""
Convert geotrellis TileLayerMetadata (Java) object to geopyspark Metadata object
"""
logger.info("Convert {m!r} to geopyspark.Metadata".format(m=metadata_sc))
crs_py = str(metadata_sc.crs())
cell_type_py = str(metadata_sc.cellType())

def convert_key(key_sc: JavaObject) -> geopyspark.SpaceTimeKey:
return geopyspark.SpaceTimeKey(
col=key_sc.col(), row=key_sc.row(),
instant=_instant_ms_to_hour(key_sc.instant())
)

bounds_sc = metadata_sc.bounds()
bounds_py = geopyspark.Bounds(minKey=convert_key(bounds_sc.minKey()), maxKey=convert_key(bounds_sc.maxKey()))

def convert_extent(extent_sc: JavaObject) -> geopyspark.Extent:
return geopyspark.Extent(extent_sc.xmin(), extent_sc.ymin(), extent_sc.xmax(), extent_sc.ymax())

extent_py = convert_extent(metadata_sc.extent())

layout_definition_sc = metadata_sc.layout()
tile_layout_sc = layout_definition_sc.tileLayout()
tile_layout_py = geopyspark.TileLayout(
layoutCols=tile_layout_sc.layoutCols(), layoutRows=tile_layout_sc.layoutRows(),
tileCols=tile_layout_sc.tileCols(), tileRows=tile_layout_sc.tileRows()
)
layout_definition_py = geopyspark.LayoutDefinition(
extent=convert_extent(layout_definition_sc.extent()),
tileLayout=tile_layout_py
)

return geopyspark.Metadata(
bounds=bounds_py, crs=crs_py, cell_type=cell_type_py,
extent=extent_py, layout_definition=layout_definition_py
)


if __name__ == '__main__':
logging.basicConfig(level="INFO")

Expand Down

0 comments on commit c484df2

Please sign in to comment.