Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sentinel-1 processor [WIP] #2

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions stacchip/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,26 @@ def get_stats(self, x: int, y: int) -> Tuple[float, float]:
nodata_percentage = np.sum(scl == self.nodata_value) / scl.size

return cloud_percentage, nodata_percentage


class Sentinel1Indexer(ChipIndexer):
nodata_value = 0

@cached_property
def vv(self):
print("Loading vv band")
with rasterio.open(self.item.assets["vv"].href) as src:
return src.read(out_shape=(1, *self.shape), resampling=Resampling.nearest)[
0
]

def get_stats(self, x: int, y: int) -> Tuple[float, float]:
vv = self.vv[
y * self.chip_size : (y + 1) * self.chip_size,
x * self.chip_size : (x + 1) * self.chip_size,
]


nodata_percentage = np.sum(vv == self.nodata_value) / vv.size

return nodata_percentage
112 changes: 112 additions & 0 deletions stacchip/processors/sentinel_1_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import json
import os
import random
from pathlib import Path
from urllib.parse import urlparse

import boto3
import geopandas as gp
import pyarrow as pa
import pystac_client
from geoarrow.pyarrow import io

from stacchip.indexer import Sentinel1Indexer

STAC_API = "https://earth-search.aws.element84.com/v1"
S1_ASSETS = [
"VV",
"VH",
]
PLATFORM_NAME = "sentinel-s1-grd"
SCENE_NODATA_LIMIT = 20
quartals = [
"{year}-01-01/{year}-03-31",
"{year}-04-01/{year}-06-30",
"{year}-07-01/{year}-09-30",
"{year}-10-01/{year}-12-31",
]


def process_mgrs_tile(index: int, mgrs_source: str, bucket: str) -> None:
# Prepare resources for the job
catalog = pystac_client.Client.open(STAC_API)
s3 = boto3.resource("s3")
data = gp.read_file(mgrs_source)
row = data.iloc[index]

print("MGRS", row["name"])
random.seed(index)
for year in random.sample(range(2018, 2024), 2):
print(f"Year {year}")
for quartal in quartals:
print(f"Quartal {quartal.format(year=year)}")
print(PLATFORM_NAME, catalog)
items = catalog.search(
collections=[f"{PLATFORM_NAME}"],
datetime=quartal.format(year=year),
max_items=2,
intersects=row.geometry,
query={
"grid:code": {
"eq": f"MGRS-{row['name']}",
},
"s1:nodata_pixel_percentage": {"lte": SCENE_NODATA_LIMIT},
},
)
#items = items.item_collections()
item = items.get_items()[0]

for key in list(item.assets.keys()):
if key not in S1_ASSETS:
del item.assets[key]
else:
url = urlparse(item.assets[key].href)
copy_source = {
"Bucket": "sentinel-cogs",
"Key": url.path.lstrip("/"),
}
print(f"Copying {copy_source}")
new_key = (
f"{PLATFORM_NAME}/{item.id}/{Path(item.assets[key].href).name}"
)
s3.meta.client.copy(copy_source, bucket, new_key)
item.assets[key].href = f"s3://{bucket}/{new_key}"

# Convert Dictionary to JSON String
data_string = json.dumps(item.to_dict())
print(data_string)

# Upload JSON String to an S3 Object
s3_bucket = s3.Bucket(name=bucket)
#s3_bucket.put_object(
# Key=f"{PLATFORM_NAME}/{item.id}/stac_item.json",
# Body=data_string,
#)

indexer = Sentinel1Indexer(item)
index = indexer.create_index()

writer = pa.BufferOutputStream()
io.write_geoparquet_table(index, writer)
body = bytes(writer.getvalue())
# Centralize the index files to make combining them easier later on
#s3_bucket.put_object(
# Body=body,
# Key=f"index/{PLATFORM_NAME}/{item.id}/index_{item.id}.parquet",
#)


def process() -> None:

if "AWS_BATCH_JOB_ARRAY_INDEX" not in os.environ:
raise ValueError("AWS_BATCH_JOB_ARRAY_INDEX env var not set")
if "STACCHIP_MGRS_SOURCE" not in os.environ:
raise ValueError("STACCHIP_MGRS_SOURCE env var not set")
if "STACCHIP_BUCKET" not in os.environ:
raise ValueError("STACCHIP_BUCKET env var not set")

index = int(os.environ["AWS_BATCH_JOB_ARRAY_INDEX"])
mgrs_source = os.environ["STACCHIP_MGRS_SOURCE"]
bucket = os.environ["STACCHIP_BUCKET"]

process_mgrs_tile(index, mgrs_source, bucket)
Loading