-
Notifications
You must be signed in to change notification settings - Fork 20
/
_mdtools.py
903 lines (730 loc) · 27.6 KB
/
_mdtools.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
"""
STAC -> EO3 utilities.
Utilities for translating STAC Items to EO3 Datasets.
"""
import datetime
from collections import Counter
from copy import copy
from typing import (
Any,
Dict,
Iterable,
Iterator,
List,
Optional,
Sequence,
Set,
Tuple,
TypeVar,
Union,
)
import pystac.asset
import pystac.collection
import pystac.errors
import pystac.item
import shapely.geometry
from affine import Affine
from odc.geo import (
CRS,
XY,
Geometry,
MaybeCRS,
Resolution,
SomeResolution,
geom,
res_,
wh_,
xy_,
)
from odc.geo.geobox import AnchorEnum, GeoBox, GeoboxAnchor
from odc.geo.types import Unset
from odc.geo.xr import ODCExtension
from pystac.extensions.eo import EOExtension
from pystac.extensions.item_assets import ItemAssetsExtension
from pystac.extensions.projection import ProjectionExtension
from pystac.extensions.raster import RasterBand, RasterExtension
from toolz import dicttoolz
from ._model import (
BandKey,
BandQuery,
MDParseConfig,
ParsedItem,
RasterBandMetadata,
RasterCollectionMetadata,
RasterSource,
)
T = TypeVar("T")
ConversionConfig = Dict[str, Any]
EPSG4326 = CRS("EPSG:4326")
# Assets with these roles are ignored unless manually requested
ROLES_THUMBNAIL = {"thumbnail", "overview"}
# Used to detect image assets when media_type is missing
RASTER_FILE_EXTENSIONS = {"tif", "tiff", "jpeg", "jpg", "jp2", "img"}
def with_default(v: Optional[T], default_value: T) -> T:
"""
Replace ``None`` with default value.
:param v: Value that might be None
:param default_value: Default value of the same type as v
:return: ``v`` unless it is ``None`` then return ``default_value`` instead
"""
if v is None:
return default_value
return v
def _band_metadata_raw(asset: pystac.asset.Asset) -> List[RasterBand]:
bands = asset.to_dict().get("raster:bands", None)
if bands is None:
return []
return [RasterBand(props) for props in bands]
def band_metadata(
asset: pystac.asset.Asset, default: RasterBandMetadata
) -> List[RasterBandMetadata]:
"""
Compute band metadata from Asset raster extension with defaults from default.
:param asset: Asset with raster extension
:param default: Values to use for fallback
:return: List of BandMetadata constructed from raster:bands metadata
"""
bands: List[RasterBand] = []
try:
rext = RasterExtension.ext(asset)
if rext.bands is not None:
bands = rext.bands
except pystac.errors.ExtensionNotImplemented:
bands = _band_metadata_raw(asset)
if len(bands) == 0:
return [default]
def _norm_nodata(nodata) -> Union[float, None]:
if nodata is None:
return None
if isinstance(nodata, (int, float)):
return nodata
return float(nodata)
return [
RasterBandMetadata(
with_default(band.data_type, default.data_type),
with_default(_norm_nodata(band.nodata), default.nodata),
with_default(band.unit, default.unit),
)
for band in bands
]
def has_proj_ext(item: Union[pystac.item.Item, pystac.collection.Collection]) -> bool:
"""
Check if STAC Item or Collection has projection extension.
:returns: ``True`` if PROJ exetension is enabled
:returns: ``False`` if no PROJ extension was found
"""
try:
ProjectionExtension.validate_has_extension(item, add_if_missing=False)
return True
except pystac.errors.ExtensionNotImplemented:
return False
def has_raster_ext(item: Union[pystac.item.Item, pystac.collection.Collection]) -> bool:
"""
Check if STAC Item/Collection have EOExtension.
:returns: ``True`` if Raster exetension is enabled
:returns: ``False`` if no Rasetr extension was found
"""
try:
RasterExtension.validate_has_extension(item, add_if_missing=False)
return True
except pystac.errors.ExtensionNotImplemented:
return any(
ext_name.startswith("https://stac-extensions.github.io/raster/")
for ext_name in item.stac_extensions
)
def has_proj_data(asset: pystac.asset.Asset) -> bool:
"""
Check if STAC Asset contains proj extension data.
:returns: True if both ``.shape`` and ``.transform`` are set
:returns: False if either ``.shape`` or ``.transform`` are missing
"""
prj = ProjectionExtension.ext(asset)
return prj.shape is not None and prj.transform is not None
def is_raster_data(asset: pystac.asset.Asset, check_proj: bool = False) -> bool:
"""
Heuristic for determining if Asset points to raster data.
- If media type looks like image and roles don't look like thumbnail/overview
- If media type is undefined and roles contains "data"
- If media type is undefined and href ends on image extension
:param asset:
STAC Asset to check
:param check_proj:
when enabled check if asset is part of an Item that has projection
extension enabled and if yes only consider bands with
projection data as "raster data" bands.
"""
# pylint: disable=too-many-return-statements
# some of these are for readability
if check_proj:
if (
asset.owner is not None
and has_proj_ext(asset.owner)
and not has_proj_data(asset)
):
return False
roles: Set[str] = set(asset.roles or [])
media_type = asset.media_type
if media_type is None:
# Type undefined
# Look if it has data role
if "data" in roles:
return True
if "metadata" in roles:
return False
elif "image/" in media_type:
# Image:
# False -- when thumbnail
# True -- otherwise
if any(r in roles for r in ROLES_THUMBNAIL):
return False
return True
else:
# Some type that is not `image/*`
return False
ext = asset.href.split(".")[-1].lower()
return ext in RASTER_FILE_EXTENSIONS
def mk_1x1_geobox(g: Geometry) -> GeoBox:
"""
Construct 1x1 pixels GeoBox tightly enclosing supplied geometry.
:param geom: Geometry in whatever projection
:return: GeoBox object such that geobox.extent.contains(geom) is True, geobox.shape == (1,1)
"""
x1, y1, x2, y2 = g.boundingbox
# note that Y axis is inverted
# 0,0 -> X_min, Y_max
# 1,1 -> X_max, Y_min
return GeoBox((1, 1), Affine((x2 - x1), 0, x1, 0, (y1 - y2), y2), g.crs)
def asset_geobox(asset: pystac.asset.Asset) -> GeoBox:
"""
Compute GeoBox from STAC Asset.
This only works if ProjectionExtension is used with the
following properties populated:
- shape
- transform
- CRS
:raises ValueError: when transform,shape or crs are missing
:raises ValueError: when transform is not Affine.
"""
try:
_proj = ProjectionExtension.ext(asset)
except pystac.errors.ExtensionNotImplemented:
raise ValueError("No projection extension defined") from None
if _proj.shape is None or _proj.transform is None or _proj.crs_string is None:
raise ValueError(
"The asset must have the following fields (from the projection extension):"
" shape, transform, and one of an epsg, wkt2, or projjson"
)
h, w = _proj.shape
if len(_proj.transform) not in (6, 9):
raise ValueError("Asset transform must be 6 or 9 elements in size")
if len(_proj.transform) == 9 and _proj.transform[-3:] != [0, 0, 1]:
raise ValueError(f"Asset transform is not affine: {_proj.transform}")
affine = Affine(*_proj.transform[:6])
return GeoBox(wh_(w, h), affine, _proj.crs_string)
def geobox_gsd(geobox: GeoBox) -> float:
"""
Compute ground sampling distance of a given GeoBox.
:param geobox: input :class:`~odc.geo.geobox.GeoBox`
:returns: Minimum ground sampling distance along X/Y
"""
return min(map(abs, [geobox.transform.a, geobox.transform.e])) # type: ignore
def compute_eo3_grids(
assets: Dict[str, pystac.asset.Asset]
) -> Tuple[Dict[str, GeoBox], Dict[str, str]]:
"""
Compute a minimal set of eo3 grids.
Pick default one, give names to non-default grids, while keeping track of
which asset has which grid
Assets must have ProjectionExtension with shape, transform and crs information
populated.
"""
assert len(assets) > 0
geoboxes = dicttoolz.valmap(asset_geobox, assets)
return _group_geoboxes(geoboxes)
def _group_geoboxes(
geoboxes: Dict[str, GeoBox]
) -> Tuple[Dict[str, GeoBox], Dict[str, str]]:
# pylint: disable=too-many-locals
def gbox_name(geobox: GeoBox) -> str:
gsd = geobox_gsd(geobox)
return f"g{gsd:g}"
# GeoBox to list of bands that share same footprint
grids: Dict[GeoBox, List[str]] = {}
crs: Optional[CRS] = None
for k, geobox in geoboxes.items():
grids.setdefault(geobox, []).append(k)
# Default grid is the one with highest count of bands
# If there is a tie pick one with the smallest ground sampling distance
def gbox_score(geobox: GeoBox) -> Tuple[int, float]:
return (-len(grids[geobox]), geobox_gsd(geobox))
# locate default grid
g_default, *_ = sorted(grids, key=gbox_score)
named_grids: Dict[str, GeoBox] = {}
band2grid: Dict[str, str] = {}
for grid, bands in grids.items():
if crs is None:
crs = grid.crs
grid_name = "default" if grid is g_default else gbox_name(grid)
if grid_name in named_grids:
band, *_ = bands
grid_name = f"{grid_name}-{band}"
named_grids[grid_name] = grid
for band in bands:
band2grid[band] = grid_name
return named_grids, band2grid
def band2grid_from_gsd(assets: Dict[str, pystac.asset.Asset]) -> Dict[str, str]:
grids: Dict[float, List[str]] = {}
for name, asset in assets.items():
gsd = asset.common_metadata.gsd
gsd = 0 if gsd is None else gsd
gsd_normed = float(f"{gsd:g}")
grids.setdefault(gsd_normed, []).append(name)
# Default grid is one with largest number of bands
# .. and lowest gsd when there is a tie
(_, default_gsd), *_ = sorted((-len(bands), gsd) for gsd, bands in grids.items())
band2grid = {}
for gsd, bands in grids.items():
grid_name = "default" if gsd == default_gsd else f"g{gsd:g}"
for band in bands:
band2grid[band] = grid_name
return band2grid
def _extract_aliases(
asset_name: str, asset: pystac.asset.Asset, block_list: Set[str]
) -> Iterator[Tuple[str, int, BandKey]]:
try:
eo = EOExtension.ext(asset)
except pystac.errors.ExtensionNotImplemented:
return
if eo.bands is None:
return
for idx, band in enumerate(eo.bands):
for alias in [band.name, band.common_name]:
if alias is not None and alias not in block_list:
yield (alias, len(eo.bands), (asset_name, idx + 1))
def alias_map_from_eo(item: pystac.item.Item) -> Dict[str, List[BandKey]]:
"""
Generate mapping ``common name -> canonical name``.
For all unique common names defined on the Item's assets via the eo
extension, record a mapping to the asset key ("canonical name"). Non-unique
common names are ignored with a warning unless ``quiet`` flag is set.
:param item: STAC :class:`~pystac.item.Item` to process
:return: common name to (asset, idx) mapping
"""
aliases: Dict[str, List[BandKey]] = {}
asset_band_counts: Dict[str, int] = {}
asset_names = set(item.assets)
for asset_name, asset in item.assets.items():
for alias, count, bkey in _extract_aliases(asset_name, asset, asset_names):
aliases.setdefault(alias, []).append(bkey)
asset_band_counts[asset_name] = count
# Alias pointing to an asset with fewer bands is
# of higher priority, 1-band data asset vs 3 band visual
def _cmp(x):
asset, _ = x
return (asset_band_counts[asset], asset)
return {alias: sorted(bands, key=_cmp) for alias, bands in aliases.items()}
def mk_sample_item(collection: pystac.collection.Collection) -> pystac.item.Item:
try:
item_assets = ItemAssetsExtension.ext(collection).item_assets
except pystac.errors.ExtensionNotImplemented:
raise ValueError(
"This only works on Collections with ItemAssets extension"
) from None
item = pystac.item.Item(
"sample",
None,
None,
datetime.datetime(2020, 1, 1),
{},
stac_extensions=collection.stac_extensions,
collection=collection,
)
for name, asset in item_assets.items():
_asset = dict(href="")
_asset.update(asset.to_dict())
item.add_asset(name, pystac.asset.Asset.from_dict(_asset))
return item
def _collection_id(item: pystac.item.Item) -> str:
# choose first that is set
# 1. collection_id
# 2. odc:product
# 3. "_"
if item.collection_id is None:
# early ODC data
return str(item.properties.get("odc:product", "_"))
return str(item.collection_id)
class _CMDAssembler:
"""
Incrementally build up collection metadata from item stream.
Expect to see items of the same collection only.
"""
# pylint: disable=too-few-public-methods
def __init__(
self, collection_id: str, cfg: Optional[ConversionConfig] = None
) -> None:
if cfg is None:
cfg = {}
self._cfg = MDParseConfig.from_dict(collection_id, cfg)
self.check_proj: bool = not self._cfg.ignore_proj
self.has_proj: Optional[bool] = None
self.collection_id = collection_id
self.md: Optional[RasterCollectionMetadata] = None
self._asset_keeps: Dict[str, bool] = {}
self._known_assets: Set[str] = set()
def _keep(self, kv: Tuple[str, pystac.asset.Asset]) -> bool:
c = self._cfg
name, asset = kv
if name in c.band_cfg:
return True
assert self.has_proj is not None
return is_raster_data(asset, check_proj=self.has_proj)
def _extract_bands(
self, name: str, asset: pystac.asset.Asset
) -> Dict[BandKey, RasterBandMetadata]:
c = self._cfg
bm = c.band_cfg.get(name, None)
if bm is not None:
return {(name, 1): copy(bm)}
return {
(name, idx + 1): bm
for idx, bm in enumerate(band_metadata(asset, c.band_defaults))
}
def _bootstrap(self, item: pystac.item.Item):
"""Called on the very first item only."""
self.has_proj = has_proj_ext(item) if self.check_proj else False
data_bands: Dict[str, pystac.asset.Asset] = dicttoolz.itemfilter(
self._keep, item.assets
)
# found no data bands with check_proj=True
# so try again with check_proj=False
if len(data_bands) == 0 and self.has_proj:
self.has_proj = False
self.check_proj = False
data_bands = dicttoolz.itemfilter(self._keep, item.assets)
self._asset_keeps = {name: name in data_bands for name in item.assets}
self._known_assets = set(self._asset_keeps)
bands: Dict[BandKey, RasterBandMetadata] = {}
aliases = alias_map_from_eo(item)
# 1. If band in user config -- use that
# 2. Use data from raster extension (with fallback to "*" config)
# 3. Use config for "*" from user config as fallback
for name, asset in data_bands.items():
bands.update(self._extract_bands(name, asset))
for alias, bkey in self._cfg.aliases.items():
aliases.setdefault(alias, []).insert(0, bkey)
# We assume that grouping of data bands into grids is consistent across
# entire collection, so we compute it once and keep it
if self.has_proj:
_, band2grid = compute_eo3_grids(data_bands)
else:
band2grid = band2grid_from_gsd(data_bands)
self.md = RasterCollectionMetadata(
self.collection_id,
bands=bands,
aliases=aliases,
has_proj=self.has_proj,
band2grid=band2grid,
)
def update(self, item: pystac.item.Item):
# pylint: disable=too-many-locals,too-many-branches
if self.md is None:
self._bootstrap(item)
return
new_assets = set(item.assets) - self._known_assets
if len(new_assets) == 0:
return
new_data_assets: List[Tuple[str, pystac.asset.Asset]] = []
for name in new_assets:
asset = item.assets[name]
is_data = self._keep((name, asset))
self._asset_keeps[name] = is_data
if is_data:
new_data_assets.append((name, asset))
self._known_assets = set(self._asset_keeps)
# some new assets that we don't care about
if len(new_data_assets) == 0:
return
bands = self.md.bands
aliases = self.md.aliases
band2grid = self.md.band2grid
# GeoBox -> grid name
grid2band: Dict[GeoBox, str] = {}
if self.has_proj:
for name, asset in item.assets.items():
if (grid_name := band2grid.get(name, None)) is not None:
grid2band[asset_geobox(asset)] = grid_name
for name, asset in new_data_assets:
bands.update(self._extract_bands(name, asset))
# update alias table
for alias, count, bkey in _extract_aliases(name, asset, self._known_assets):
_bands = aliases.setdefault(alias, [])
if count == 1:
_bands.insert(0, bkey)
else:
_bands.append(bkey)
if self.has_proj:
band2grid[name] = grid2band.get(asset_geobox(asset), f"grid-{name}")
def extract_collection_metadata(
item: pystac.item.Item, cfg: Optional[ConversionConfig] = None
) -> RasterCollectionMetadata:
"""
Use sample item to figure out raster bands within the collection.
1. Decide which assets contain raster data
2. Extract metadata about about rasters from STAC or from ``cfg``
3. See if ``proj`` data is available and group bands by resolution
4. Construct alias map from common names and user config
:param item: Representative STAC item from a collection.
:param cfg: Optional user configuration
:return: :py:class:`~odc.stac._model.RasterCollectionMetadata`
"""
collection_id = _collection_id(item)
proc = _CMDAssembler(collection_id, cfg)
proc.update(item)
assert proc.md is not None
return proc.md
def parse_item(
item: pystac.item.Item,
template: Union[RasterCollectionMetadata, ConversionConfig, None] = None,
) -> ParsedItem:
"""
Extract raster band information relevant for data loading.
:param item: STAC Item
:param template: Common collection level information
:return: ``ParsedItem``
"""
# pylint: disable=too-many-locals
if not isinstance(template, RasterCollectionMetadata):
template = extract_collection_metadata(item, template)
band2grid = template.band2grid
has_proj = False if template.has_proj is False else has_proj_ext(item)
_assets = item.assets
_grids: Dict[str, GeoBox] = {}
bands: Dict[BandKey, RasterSource] = {}
geometry: Optional[Geometry] = None
if item.geometry is not None:
geometry = Geometry(item.geometry, EPSG4326)
def _get_grid(grid_name: str, asset: pystac.asset.Asset) -> GeoBox:
grid = _grids.get(grid_name, None)
if grid is not None:
return grid
grid = asset_geobox(asset)
_grids[grid_name] = grid
return grid
for bk, meta in template.bands.items():
asset_name, _ = bk
asset = _assets.get(asset_name)
if asset is None:
continue
grid_name = band2grid.get(asset_name, "default")
geobox: Optional[GeoBox] = _get_grid(grid_name, asset) if has_proj else None
uri = asset.get_absolute_href()
if uri is None:
raise ValueError(
f"Can not determine absolute path for band: {asset_name}"
) # pragma: no cover (https://github.com/stac-utils/pystac/issues/754)
bands[bk] = RasterSource(uri=uri, geobox=geobox, meta=meta)
md = item.common_metadata
return ParsedItem(
item.id,
template,
bands,
geometry,
datetime=item.datetime,
datetime_range=(md.start_datetime, md.end_datetime),
href=item.get_self_href(),
)
def parse_items(
items: Iterable[pystac.item.Item], cfg: Optional[ConversionConfig] = None
) -> Iterator[ParsedItem]:
proc_cache: Dict[str, _CMDAssembler] = {}
for item in items:
collection_id = _collection_id(item)
proc = proc_cache.get(collection_id, None)
if proc is None:
proc = _CMDAssembler(collection_id, cfg)
proc_cache[collection_id] = proc
proc.update(item)
yield parse_item(item, proc.md)
def _auto_load_params(
items: Sequence[ParsedItem], bands: Optional[Sequence[str]] = None
) -> Optional[Tuple[Optional[CRS], Resolution]]:
def _key(item: ParsedItem) -> Optional[Tuple[Optional[CRS], Resolution]]:
gbx = item.geoboxes(bands)
if len(gbx) == 0:
return None
g, *_ = gbx
return (g.crs, g.resolution)
best = Counter(filter(lambda x: x is not None, map(_key, items))).most_common(1)
if len(best) == 0:
return None
best, _ = best[0]
assert best is not None # filter is too hard for mypy
crs, res = best
return (crs, res)
def _normalize_geometry(xx: Any) -> Geometry:
if isinstance(xx, shapely.geometry.base.BaseGeometry):
return Geometry(xx, "epsg:4326")
if isinstance(xx, Geometry):
return xx
if isinstance(xx, dict):
return Geometry(xx, "epsg:4326")
# GeoPandas
_geo = getattr(xx, "__geo_interface__", None)
if _geo is None:
raise ValueError("Can't interpret value as geometry")
_crs = getattr(xx, "crs", "epsg:4326")
return Geometry(_geo, _crs)
def _compute_bbox(
items: Iterable[ParsedItem],
crs: MaybeCRS,
bands: BandQuery = None,
) -> geom.BoundingBox:
def bboxes(items: Iterable[ParsedItem]) -> Iterator[geom.BoundingBox]:
crs0 = crs
for item in items:
g = item.safe_geometry(crs0, bands=bands)
assert g is not None
if crs0 is crs:
# If crs is something like "utm", make sure
# same one is used going forward
crs0 = g.crs
yield g.boundingbox
return geom.bbox_union(bboxes(items))
def _align2anchor(
align: Optional[Union[float, int, XY[float]]], resolution: SomeResolution
) -> GeoboxAnchor:
if align is None:
return AnchorEnum.EDGE
if isinstance(align, (float, int)):
align = xy_(align, align)
# support old-style "align", which is basically anchor but in CRS units
ax, ay = align.xy
if ax == 0 and ay == 0:
return AnchorEnum.EDGE
resolution = res_(resolution)
return xy_(ax / abs(resolution.x), ay / abs(resolution.y))
def output_geobox(
items: Sequence[ParsedItem],
bands: Optional[Sequence[str]] = None,
*,
crs: MaybeCRS = Unset(),
resolution: Optional[SomeResolution] = None,
anchor: Optional[GeoboxAnchor] = None,
align: Optional[Union[float, int, XY[float]]] = None,
geobox: Optional[GeoBox] = None,
like: Optional[Any] = None,
geopolygon: Optional[Any] = None,
bbox: Optional[Tuple[float, float, float, float]] = None,
lon: Optional[Tuple[float, float]] = None,
lat: Optional[Tuple[float, float]] = None,
x: Optional[Tuple[float, float]] = None,
y: Optional[Tuple[float, float]] = None,
) -> Optional[GeoBox]:
# pylint: disable=too-many-locals,too-many-branches,too-many-statements,too-many-return-statements
# geobox, like --> GeoBox
# lon,lat --> geopolygon[epsg:4326]
# bbox --> geopolygon[epsg:4326]
# x,y,crs --> geopolygon[crs]
# [items] --> crs, geopolygon[crs]
# [items] --> crs, resolution
# geopolygon, crs, resolution[, anchor|align] --> GeoBox
params = {
k
for k, v in dict(
x=x,
y=y,
lon=lon,
lat=lat,
crs=crs,
resolution=resolution,
align=align,
anchor=anchor,
like=like,
geopolygon=geopolygon,
bbox=bbox,
geobox=geobox,
).items()
if not (v is None or isinstance(v, Unset))
}
def report_extra_args(primary: str, *ok_args):
args = params - set([primary, *ok_args])
if len(args) > 0:
raise ValueError(
f"Too many arguments when using `{primary}=`: {','.join(args)}"
)
def check_arg_sets(*args: str) -> bool:
x = params & set(args)
if len(x) == 0 or len(x) == len(args):
return True
return False
if geobox is not None:
report_extra_args("geobox")
return geobox
if like is not None:
report_extra_args("like")
if isinstance(like, GeoBox):
return like
_odc = getattr(like, "odc", None)
if _odc is None:
raise ValueError("No geospatial info on `like=` input")
assert isinstance(_odc, ODCExtension)
if _odc.geobox is None:
raise ValueError("No geospatial info on `like=` input")
assert isinstance(_odc.geobox, GeoBox)
return _odc.geobox
if not check_arg_sets("x", "y"):
raise ValueError("Need to supply both x= and y=")
if not check_arg_sets("lon", "lat"):
raise ValueError("Need to supply both lon= and lat=")
if isinstance(crs, Unset):
crs = None
grid_params = ("crs", "align", "anchor", "resolution")
query_crs: Optional[CRS] = None
if geopolygon is not None:
geopolygon = _normalize_geometry(geopolygon)
query_crs = geopolygon.crs
# Normalize x.y|lon.lat|bbox|geopolygon arguments to a geopolygon|None
if geopolygon is not None:
report_extra_args("geopolygon", *grid_params)
elif bbox is not None:
report_extra_args("bbox", *grid_params)
x0, y0, x1, y1 = bbox
geopolygon = geom.box(x0, y0, x1, y1, EPSG4326)
elif lat is not None and lon is not None:
# lon=(x0, x1), lat=(y0, y1)
report_extra_args("lon,lat", "lon", "lat", *grid_params)
x0, x1 = sorted(lon)
y0, y1 = sorted(lat)
geopolygon = geom.box(x0, y0, x1, y1, EPSG4326)
elif x is not None and y is not None:
if crs is None:
raise ValueError("Need to supply `crs=` when using `x=`, `y=`.")
report_extra_args("x,y", "x", "y", *grid_params)
x0, x1 = sorted(x)
y0, y1 = sorted(y)
geopolygon = geom.box(x0, y0, x1, y1, crs)
if crs is None or resolution is None:
rr = _auto_load_params(items, bands)
if rr is not None:
_crs, _res = rr
else:
_crs, _res = None, None
if crs is None:
crs = _crs or query_crs
if resolution is None:
resolution = _res
if resolution is None or crs is None:
return None
if anchor is None:
anchor = _align2anchor(align, resolution)
if geopolygon is not None:
assert isinstance(geopolygon, Geometry)
return GeoBox.from_geopolygon(
geopolygon,
resolution=resolution,
crs=crs,
anchor=anchor,
)
# compute from parsed items
_bbox = _compute_bbox(items, crs)
return GeoBox.from_bbox(_bbox, resolution=resolution, anchor=anchor)