Skip to content

Commit

Permalink
feat: add thread pool tiling
Browse files Browse the repository at this point in the history
  • Loading branch information
gadomski committed Dec 21, 2022
1 parent 5a2088e commit 957fa53
Showing 1 changed file with 49 additions and 11 deletions.
60 changes: 49 additions & 11 deletions src/stactools/usda_cdl/tile.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import logging
import threading
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from pathlib import Path
from typing import List
from typing import List, Optional

import rasterio
import rasterio.shutil
Expand All @@ -11,6 +14,8 @@

RESOLUTION = 30
DEFAULT_WINDOW_SIZE = 3000 # pixels
DEFAULT_MAX_WORKERS = 8
logger = logging.getLogger(__name__)


@dataclass
Expand Down Expand Up @@ -38,34 +43,50 @@ def rasterio_window(self) -> rasterio.windows.Window:


def tile_zipfile(
infile: Path, directory: Path, size: int = DEFAULT_WINDOW_SIZE
infile: Path,
directory: Path,
size: int = DEFAULT_WINDOW_SIZE,
max_workers: int = DEFAULT_MAX_WORKERS,
) -> List[Path]:
"""Tiles an input GeoTIFF (wrapped in a zipfile)."""
if infile.suffix != ".zip":
raise ValueError(f"Infile should end in .zip: {infile}")
zip_path = f"zip://{infile}!/{infile.stem}.tif"
with rasterio.open(zip_path) as dataset:
return _tile_dataset(dataset, Metadata.from_href(infile.stem), directory, size)
return _tile_dataset(
dataset, Metadata.from_href(infile.stem), directory, size, max_workers
)


def tile_geotiff(
infile: Path, directory: Path, size: int = DEFAULT_WINDOW_SIZE
infile: Path,
directory: Path,
size: int = DEFAULT_WINDOW_SIZE,
max_workers: int = DEFAULT_MAX_WORKERS,
) -> List[Path]:
"""Tiles an input GeoTIFF."""
with rasterio.open(infile) as dataset:
return _tile_dataset(dataset, Metadata.from_href(str(infile)), directory, size)
return _tile_dataset(
dataset, Metadata.from_href(str(infile)), directory, size, max_workers
)


def _tile_dataset(
dataset: DatasetReader, metadata: Metadata, directory: Path, size: int
dataset: DatasetReader,
metadata: Metadata,
directory: Path,
size: int,
max_workers: int,
) -> List[Path]:
windows = _create_windows(dataset, size)
paths = list()
for window in windows:
read_lock = threading.Lock()

def tile(window: Window) -> Optional[Path]:
rasterio_window = window.rasterio_window()
data = dataset.read(1, window=rasterio_window)
with read_lock:
data = dataset.read(1, window=rasterio_window)
if not data.any():
continue
return None
transform = dataset.window_transform(rasterio_window)
profile = {
"driver": "GTiff",
Expand All @@ -84,7 +105,24 @@ def _tile_dataset(
if colormap:
open_memory_file.write_colormap(1, colormap)
rasterio.shutil.copy(open_memory_file, path, **metadata.cog_profile)
paths.append(path)
return path

paths = list()
skipped = 0
written = 0
num_windows = len(windows)
interval = int(num_windows / 100) or 1
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for i, path in enumerate(executor.map(tile, windows)):
if path is None:
skipped += 1
else:
written += 1
paths.append(path)
if i % interval == 0:
logger.info(
f"[{i + 1}/{num_windows}] written={written}, skipped={skipped}"
)
return paths


Expand Down

0 comments on commit 957fa53

Please sign in to comment.