diff --git a/_toc.yml b/_toc.yml index 97d53f51..ea322130 100644 --- a/_toc.yml +++ b/_toc.yml @@ -15,6 +15,7 @@ parts: chapters: - file: notebooks/advanced/Parquet_Reference_Storage - file: notebooks/advanced/Pangeo_Forge + - file: notebooks/advanced/appending - caption: Generating Reference Files chapters: diff --git a/notebooks/advanced/appending.ipynb b/notebooks/advanced/appending.ipynb new file mode 100644 index 00000000..95be830d --- /dev/null +++ b/notebooks/advanced/appending.ipynb @@ -0,0 +1,332 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Appending to Kerchunk references\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Overview\n", + "\n", + "In this tutorial we'll show how to append to a pre-existing Kerchunk reference. We'll use the same datasets as in the [NetCDF reference generation](../generating_references/NetCDF.ipynb) example. \n", + "\n", + "## Prerequisites\n", + "| Concepts | Importance | Notes |\n", + "| --- | --- | --- |\n", + "| [Kerchunk Basics](../foundations/kerchunk_basics) | Required | Core |\n", + "| [Multiple Files and Kerchunk](../foundations/kerchunk_multi_file) | Required | Core |\n", + "| [Multi-File Datasets with Kerchunk](../case_studies/ARG_Weather.ipynb) | Required | IO/Visualization |\n", + "\n", + "- **Time to learn**: 45 minutes\n", + "---" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Imports" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import logging\n", + "from tempfile import TemporaryDirectory\n", + "\n", + "import dask\n", + "import fsspec\n", + "import ujson\n", + "import xarray as xr\n", + "from distributed import Client\n", + "from kerchunk.combine import MultiZarrToZarr\n", + "from kerchunk.hdf import SingleHdf5ToZarr" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create Input File List\n", + "\n", + "Here we are using `fsspec's` glob functionality along with the *`*`* wildcard operator and some string slicing to grab a list of NetCDF files from a `s3` `fsspec` filesystem. " + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Initiate fsspec filesystems for reading\n", + "fs_read = fsspec.filesystem(\"s3\", anon=True, skip_instance_cache=True)\n", + "\n", + "files_paths = fs_read.glob(\n", + " \"s3://smn-ar-wrf/DATA/WRF/DET/2022/12/31/12/WRFDETAR_01H_20221231_12_*\"\n", + ")\n", + "\n", + "# Here we prepend the prefix 's3://', which points to AWS.\n", + "file_pattern = sorted([\"s3://\" + f for f in files_paths])" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# This dictionary will be passed as kwargs to `fsspec`. For more details, check out the\n", + "# `foundations/kerchunk_basics` notebook.\n", + "so = dict(mode=\"rb\", anon=True, default_fill_cache=False, default_cache_type=\"first\")\n", + "\n", + "# We are creating a temporary directory to store the .json reference files\n", + "# Alternately, you could write these to cloud storage.\n", + "td = TemporaryDirectory()\n", + "temp_dir = td.name" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Start a Dask Client\n", + "\n", + "To parallelize the creation of our reference files, we will use `Dask`. For a detailed guide on how to use Dask and Kerchunk, see the Foundations notebook: [Kerchunk and Dask](../foundations/kerchunk_dask).\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client = Client(n_workers=8, silence_logs=logging.ERROR)\n", + "client" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create a `Kerchunk` reference file for the first 24 hours" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "first_24_hrs = file_pattern[:24]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Use Kerchunk's `SingleHdf5ToZarr` method to create a `Kerchunk` index from\n", + "# a NetCDF file.\n", + "\n", + "\n", + "def generate_json_reference(fil, output_dir: str):\n", + " with fs_read.open(fil, **so) as infile:\n", + " h5chunks = SingleHdf5ToZarr(infile, fil, inline_threshold=300)\n", + " fname = fil.split(\"/\")[-1].strip(\".nc\")\n", + " outf = f\"{output_dir}/{fname}.json\"\n", + " with open(outf, \"wb\") as f:\n", + " f.write(ujson.dumps(h5chunks.translate()).encode())\n", + " return outf\n", + "\n", + "\n", + "# Generate Dask Delayed objects\n", + "tasks = [dask.delayed(generate_json_reference)(fil, temp_dir) for fil in first_24_hrs]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Start parallel processing\n", + "import warnings\n", + "\n", + "warnings.filterwarnings(\"ignore\")\n", + "dask.compute(tasks)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Combine .json `Kerchunk` reference files and write a combined `Kerchunk` index\n", + "\n", + "In the following cell, we are combining all the `.json` reference files that were generated above into a single reference file and writing that file to disk." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create a list of reference json files\n", + "output_files = [\n", + " f\"{temp_dir}/{f.strip('.nc').split('/')[-1]}.json\" for f in first_24_hrs\n", + "]\n", + "\n", + "# combine individual references into single consolidated reference\n", + "mzz = MultiZarrToZarr(\n", + " output_files,\n", + " concat_dims=[\"time\"],\n", + " identical_dims=[\"y\", \"x\"],\n", + " remote_protocol=\"s3\",\n", + " remote_options={\"anon\": True},\n", + " coo_map={\"time\": \"cf:time\"},\n", + ")\n", + "# save translate reference in memory for later visualization\n", + "multi_kerchunk = mzz.translate()\n", + "\n", + "# Write kerchunk .json record.\n", + "output_fname = \"ARG_combined.json\"\n", + "with open(f\"{output_fname}\", \"wb\") as f:\n", + " f.write(ujson.dumps(multi_kerchunk).encode())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Append references for the next 24 hours\n", + "\n", + "We'll now append the references for the next 24 hours. First, we create an individual temporary reference file for each input data file. Then,\n", + "we load the original references and append the new references." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# First generate the individual reference files to be appended\n", + "\n", + "second_24_hrs = file_pattern[24:48]\n", + "\n", + "# Generate Dask Delayed objects\n", + "tasks = [dask.delayed(generate_json_reference)(fil, temp_dir) for fil in second_24_hrs]\n", + "\n", + "# Generate reference files for the individual NetCDF files\n", + "dask.compute(tasks)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Load the original references\n", + "fs_local = fsspec.filesystem(\"file\")\n", + "archive = ujson.load(fs_local.open(output_fname))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Create a list of individual reference files to append to the combined reference\n", + "output_files = [\n", + " f\"{temp_dir}/{f.strip('.nc').split('/')[-1]}.json\" for f in second_24_hrs\n", + "]\n", + "\n", + "# Append to the existing reference file\n", + "mzz = MultiZarrToZarr.append(\n", + " output_files,\n", + " original_refs=archive,\n", + " concat_dims=[\"time\"],\n", + " identical_dims=[\"y\", \"x\"],\n", + " remote_protocol=\"s3\",\n", + " remote_options={\"anon\": True},\n", + " coo_map={\"time\": \"cf:time\"},\n", + ")\n", + "\n", + "multi_kerchunk = mzz.translate()\n", + "\n", + "# Write kerchunk .json record.\n", + "output_fname = \"ARG_combined.json\"\n", + "with open(f\"{output_fname}\", \"wb\") as f:\n", + " f.write(ujson.dumps(multi_kerchunk).encode())" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Opening Reference Dataset with Fsspec and Xarray\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "storage_options = {\n", + " \"remote_protocol\": \"s3\",\n", + " \"skip_instance_cache\": True,\n", + "} # options passed to fsspec\n", + "open_dataset_options = {\"chunks\": {}} # opens passed to xarray\n", + "\n", + "ds = xr.open_dataset(\n", + " \"ARG_combined.json\",\n", + " engine=\"kerchunk\",\n", + " storage_options=storage_options,\n", + " open_dataset_options=open_dataset_options,\n", + ")\n", + "\n", + "ds" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "kerchunk-cookbook", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.13" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}