Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions _toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ parts:
chapters:
- file: notebooks/advanced/Parquet_Reference_Storage
- file: notebooks/advanced/Pangeo_Forge
- file: notebooks/advanced/appending

- caption: Generating Reference Files
chapters:
Expand Down
332 changes: 332 additions & 0 deletions notebooks/advanced/appending.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,332 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Appending to Kerchunk references\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Overview\n",
"\n",
"In this tutorial we'll show how to append to a pre-existing Kerchunk reference. We'll use the same datasets as in the [NetCDF reference generation](../generating_references/NetCDF.ipynb) example. \n",
"\n",
"## Prerequisites\n",
"| Concepts | Importance | Notes |\n",
"| --- | --- | --- |\n",
"| [Kerchunk Basics](../foundations/kerchunk_basics) | Required | Core |\n",
"| [Multiple Files and Kerchunk](../foundations/kerchunk_multi_file) | Required | Core |\n",
"| [Multi-File Datasets with Kerchunk](../case_studies/ARG_Weather.ipynb) | Required | IO/Visualization |\n",
"\n",
"- **Time to learn**: 45 minutes\n",
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Imports"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import logging\n",
"from tempfile import TemporaryDirectory\n",
"\n",
"import dask\n",
"import fsspec\n",
"import ujson\n",
"import xarray as xr\n",
"from distributed import Client\n",
"from kerchunk.combine import MultiZarrToZarr\n",
"from kerchunk.hdf import SingleHdf5ToZarr"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create Input File List\n",
"\n",
"Here we are using `fsspec's` glob functionality along with the *`*`* wildcard operator and some string slicing to grab a list of NetCDF files from a `s3` `fsspec` filesystem. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Initiate fsspec filesystems for reading\n",
"fs_read = fsspec.filesystem(\"s3\", anon=True, skip_instance_cache=True)\n",
"\n",
"files_paths = fs_read.glob(\n",
" \"s3://smn-ar-wrf/DATA/WRF/DET/2022/12/31/12/WRFDETAR_01H_20221231_12_*\"\n",
")\n",
"\n",
"# Here we prepend the prefix 's3://', which points to AWS.\n",
"file_pattern = sorted([\"s3://\" + f for f in files_paths])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# This dictionary will be passed as kwargs to `fsspec`. For more details, check out the\n",
"# `foundations/kerchunk_basics` notebook.\n",
"so = dict(mode=\"rb\", anon=True, default_fill_cache=False, default_cache_type=\"first\")\n",
"\n",
"# We are creating a temporary directory to store the .json reference files\n",
"# Alternately, you could write these to cloud storage.\n",
"td = TemporaryDirectory()\n",
"temp_dir = td.name"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Start a Dask Client\n",
"\n",
"To parallelize the creation of our reference files, we will use `Dask`. For a detailed guide on how to use Dask and Kerchunk, see the Foundations notebook: [Kerchunk and Dask](../foundations/kerchunk_dask).\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"client = Client(n_workers=8, silence_logs=logging.ERROR)\n",
"client"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Create a `Kerchunk` reference file for the first 24 hours"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"first_24_hrs = file_pattern[:24]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Use Kerchunk's `SingleHdf5ToZarr` method to create a `Kerchunk` index from\n",
"# a NetCDF file.\n",
"\n",
"\n",
"def generate_json_reference(fil, output_dir: str):\n",
" with fs_read.open(fil, **so) as infile:\n",
" h5chunks = SingleHdf5ToZarr(infile, fil, inline_threshold=300)\n",
" fname = fil.split(\"/\")[-1].strip(\".nc\")\n",
" outf = f\"{output_dir}/{fname}.json\"\n",
" with open(outf, \"wb\") as f:\n",
" f.write(ujson.dumps(h5chunks.translate()).encode())\n",
" return outf\n",
"\n",
"\n",
"# Generate Dask Delayed objects\n",
"tasks = [dask.delayed(generate_json_reference)(fil, temp_dir) for fil in first_24_hrs]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Start parallel processing\n",
"import warnings\n",
"\n",
"warnings.filterwarnings(\"ignore\")\n",
"dask.compute(tasks)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Combine .json `Kerchunk` reference files and write a combined `Kerchunk` index\n",
"\n",
"In the following cell, we are combining all the `.json` reference files that were generated above into a single reference file and writing that file to disk."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create a list of reference json files\n",
"output_files = [\n",
" f\"{temp_dir}/{f.strip('.nc').split('/')[-1]}.json\" for f in first_24_hrs\n",
"]\n",
"\n",
"# combine individual references into single consolidated reference\n",
"mzz = MultiZarrToZarr(\n",
" output_files,\n",
" concat_dims=[\"time\"],\n",
" identical_dims=[\"y\", \"x\"],\n",
" remote_protocol=\"s3\",\n",
" remote_options={\"anon\": True},\n",
" coo_map={\"time\": \"cf:time\"},\n",
")\n",
"# save translate reference in memory for later visualization\n",
"multi_kerchunk = mzz.translate()\n",
"\n",
"# Write kerchunk .json record.\n",
"output_fname = \"ARG_combined.json\"\n",
"with open(f\"{output_fname}\", \"wb\") as f:\n",
" f.write(ujson.dumps(multi_kerchunk).encode())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Append references for the next 24 hours\n",
"\n",
"We'll now append the references for the next 24 hours. First, we create an individual temporary reference file for each input data file. Then,\n",
"we load the original references and append the new references."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# First generate the individual reference files to be appended\n",
"\n",
"second_24_hrs = file_pattern[24:48]\n",
"\n",
"# Generate Dask Delayed objects\n",
"tasks = [dask.delayed(generate_json_reference)(fil, temp_dir) for fil in second_24_hrs]\n",
"\n",
"# Generate reference files for the individual NetCDF files\n",
"dask.compute(tasks)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Load the original references\n",
"fs_local = fsspec.filesystem(\"file\")\n",
"archive = ujson.load(fs_local.open(output_fname))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Create a list of individual reference files to append to the combined reference\n",
"output_files = [\n",
" f\"{temp_dir}/{f.strip('.nc').split('/')[-1]}.json\" for f in second_24_hrs\n",
"]\n",
"\n",
"# Append to the existing reference file\n",
"mzz = MultiZarrToZarr.append(\n",
" output_files,\n",
" original_refs=archive,\n",
" concat_dims=[\"time\"],\n",
" identical_dims=[\"y\", \"x\"],\n",
" remote_protocol=\"s3\",\n",
" remote_options={\"anon\": True},\n",
" coo_map={\"time\": \"cf:time\"},\n",
")\n",
"\n",
"multi_kerchunk = mzz.translate()\n",
"\n",
"# Write kerchunk .json record.\n",
"output_fname = \"ARG_combined.json\"\n",
"with open(f\"{output_fname}\", \"wb\") as f:\n",
" f.write(ujson.dumps(multi_kerchunk).encode())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Opening Reference Dataset with Fsspec and Xarray\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"storage_options = {\n",
" \"remote_protocol\": \"s3\",\n",
" \"skip_instance_cache\": True,\n",
"} # options passed to fsspec\n",
"open_dataset_options = {\"chunks\": {}} # opens passed to xarray\n",
"\n",
"ds = xr.open_dataset(\n",
" \"ARG_combined.json\",\n",
" engine=\"kerchunk\",\n",
" storage_options=storage_options,\n",
" open_dataset_options=open_dataset_options,\n",
")\n",
"\n",
"ds"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "kerchunk-cookbook",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.13"
}
},
"nbformat": 4,
"nbformat_minor": 2
}