Skip to content

Commit a219fd6

Browse files
authored
Merge 4b673e2 into f4d5d70
2 parents f4d5d70 + 4b673e2 commit a219fd6

File tree

2 files changed

+333
-0
lines changed

2 files changed

+333
-0
lines changed

_toc.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ parts:
1515
chapters:
1616
- file: notebooks/advanced/Parquet_Reference_Storage
1717
- file: notebooks/advanced/Pangeo_Forge
18+
- file: notebooks/advanced/appending
1819

1920
- caption: Generating Reference Files
2021
chapters:

notebooks/advanced/appending.ipynb

Lines changed: 332 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,332 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "markdown",
5+
"metadata": {},
6+
"source": [
7+
"# Appending to Kerchunk references\n"
8+
]
9+
},
10+
{
11+
"cell_type": "markdown",
12+
"metadata": {},
13+
"source": [
14+
"## Overview\n",
15+
"\n",
16+
"In this tutorial we'll show how to append to a pre-existing Kerchunk reference. We'll use the same datasets as in the [NetCDF reference generation](../generating_references/NetCDF.ipynb) example. \n",
17+
"\n",
18+
"## Prerequisites\n",
19+
"| Concepts | Importance | Notes |\n",
20+
"| --- | --- | --- |\n",
21+
"| [Kerchunk Basics](../foundations/kerchunk_basics) | Required | Core |\n",
22+
"| [Multiple Files and Kerchunk](../foundations/kerchunk_multi_file) | Required | Core |\n",
23+
"| [Multi-File Datasets with Kerchunk](../case_studies/ARG_Weather.ipynb) | Required | IO/Visualization |\n",
24+
"\n",
25+
"- **Time to learn**: 45 minutes\n",
26+
"---"
27+
]
28+
},
29+
{
30+
"cell_type": "markdown",
31+
"metadata": {},
32+
"source": [
33+
"## Imports"
34+
]
35+
},
36+
{
37+
"cell_type": "code",
38+
"execution_count": null,
39+
"metadata": {},
40+
"outputs": [],
41+
"source": [
42+
"import logging\n",
43+
"from tempfile import TemporaryDirectory\n",
44+
"\n",
45+
"import dask\n",
46+
"import fsspec\n",
47+
"import ujson\n",
48+
"import xarray as xr\n",
49+
"from distributed import Client\n",
50+
"from kerchunk.combine import MultiZarrToZarr\n",
51+
"from kerchunk.hdf import SingleHdf5ToZarr"
52+
]
53+
},
54+
{
55+
"cell_type": "markdown",
56+
"metadata": {},
57+
"source": [
58+
"## Create Input File List\n",
59+
"\n",
60+
"Here we are using `fsspec's` glob functionality along with the *`*`* wildcard operator and some string slicing to grab a list of NetCDF files from a `s3` `fsspec` filesystem. "
61+
]
62+
},
63+
{
64+
"cell_type": "code",
65+
"execution_count": null,
66+
"metadata": {},
67+
"outputs": [],
68+
"source": [
69+
"# Initiate fsspec filesystems for reading\n",
70+
"fs_read = fsspec.filesystem(\"s3\", anon=True, skip_instance_cache=True)\n",
71+
"\n",
72+
"files_paths = fs_read.glob(\n",
73+
" \"s3://smn-ar-wrf/DATA/WRF/DET/2022/12/31/12/WRFDETAR_01H_20221231_12_*\"\n",
74+
")\n",
75+
"\n",
76+
"# Here we prepend the prefix 's3://', which points to AWS.\n",
77+
"file_pattern = sorted([\"s3://\" + f for f in files_paths])"
78+
]
79+
},
80+
{
81+
"cell_type": "code",
82+
"execution_count": null,
83+
"metadata": {},
84+
"outputs": [],
85+
"source": [
86+
"# This dictionary will be passed as kwargs to `fsspec`. For more details, check out the\n",
87+
"# `foundations/kerchunk_basics` notebook.\n",
88+
"so = dict(mode=\"rb\", anon=True, default_fill_cache=False, default_cache_type=\"first\")\n",
89+
"\n",
90+
"# We are creating a temporary directory to store the .json reference files\n",
91+
"# Alternately, you could write these to cloud storage.\n",
92+
"td = TemporaryDirectory()\n",
93+
"temp_dir = td.name"
94+
]
95+
},
96+
{
97+
"cell_type": "markdown",
98+
"metadata": {},
99+
"source": [
100+
"## Start a Dask Client\n",
101+
"\n",
102+
"To parallelize the creation of our reference files, we will use `Dask`. For a detailed guide on how to use Dask and Kerchunk, see the Foundations notebook: [Kerchunk and Dask](../foundations/kerchunk_dask).\n"
103+
]
104+
},
105+
{
106+
"cell_type": "code",
107+
"execution_count": null,
108+
"metadata": {},
109+
"outputs": [],
110+
"source": [
111+
"client = Client(n_workers=8, silence_logs=logging.ERROR)\n",
112+
"client"
113+
]
114+
},
115+
{
116+
"cell_type": "markdown",
117+
"metadata": {},
118+
"source": [
119+
"## Create a `Kerchunk` reference file for the first 24 hours"
120+
]
121+
},
122+
{
123+
"cell_type": "code",
124+
"execution_count": null,
125+
"metadata": {},
126+
"outputs": [],
127+
"source": [
128+
"first_24_hrs = file_pattern[:24]"
129+
]
130+
},
131+
{
132+
"cell_type": "code",
133+
"execution_count": null,
134+
"metadata": {},
135+
"outputs": [],
136+
"source": [
137+
"# Use Kerchunk's `SingleHdf5ToZarr` method to create a `Kerchunk` index from\n",
138+
"# a NetCDF file.\n",
139+
"\n",
140+
"\n",
141+
"def generate_json_reference(fil, output_dir: str):\n",
142+
" with fs_read.open(fil, **so) as infile:\n",
143+
" h5chunks = SingleHdf5ToZarr(infile, fil, inline_threshold=300)\n",
144+
" fname = fil.split(\"/\")[-1].strip(\".nc\")\n",
145+
" outf = f\"{output_dir}/{fname}.json\"\n",
146+
" with open(outf, \"wb\") as f:\n",
147+
" f.write(ujson.dumps(h5chunks.translate()).encode())\n",
148+
" return outf\n",
149+
"\n",
150+
"\n",
151+
"# Generate Dask Delayed objects\n",
152+
"tasks = [dask.delayed(generate_json_reference)(fil, temp_dir) for fil in first_24_hrs]"
153+
]
154+
},
155+
{
156+
"cell_type": "code",
157+
"execution_count": null,
158+
"metadata": {},
159+
"outputs": [],
160+
"source": [
161+
"# Start parallel processing\n",
162+
"import warnings\n",
163+
"\n",
164+
"warnings.filterwarnings(\"ignore\")\n",
165+
"dask.compute(tasks)"
166+
]
167+
},
168+
{
169+
"cell_type": "markdown",
170+
"metadata": {},
171+
"source": [
172+
"## Combine .json `Kerchunk` reference files and write a combined `Kerchunk` index\n",
173+
"\n",
174+
"In the following cell, we are combining all the `.json` reference files that were generated above into a single reference file and writing that file to disk."
175+
]
176+
},
177+
{
178+
"cell_type": "code",
179+
"execution_count": null,
180+
"metadata": {},
181+
"outputs": [],
182+
"source": [
183+
"# Create a list of reference json files\n",
184+
"output_files = [\n",
185+
" f\"{temp_dir}/{f.strip('.nc').split('/')[-1]}.json\" for f in first_24_hrs\n",
186+
"]\n",
187+
"\n",
188+
"# combine individual references into single consolidated reference\n",
189+
"mzz = MultiZarrToZarr(\n",
190+
" output_files,\n",
191+
" concat_dims=[\"time\"],\n",
192+
" identical_dims=[\"y\", \"x\"],\n",
193+
" remote_protocol=\"s3\",\n",
194+
" remote_options={\"anon\": True},\n",
195+
" coo_map={\"time\": \"cf:time\"},\n",
196+
")\n",
197+
"# save translate reference in memory for later visualization\n",
198+
"multi_kerchunk = mzz.translate()\n",
199+
"\n",
200+
"# Write kerchunk .json record.\n",
201+
"output_fname = \"ARG_combined.json\"\n",
202+
"with open(f\"{output_fname}\", \"wb\") as f:\n",
203+
" f.write(ujson.dumps(multi_kerchunk).encode())"
204+
]
205+
},
206+
{
207+
"cell_type": "markdown",
208+
"metadata": {},
209+
"source": [
210+
"## Append references for the next 24 hours\n",
211+
"\n",
212+
"We'll now append the references for the next 24 hours. First, we create an individual temporary reference file for each input data file. Then,\n",
213+
"we load the original references and append the new references."
214+
]
215+
},
216+
{
217+
"cell_type": "code",
218+
"execution_count": null,
219+
"metadata": {},
220+
"outputs": [],
221+
"source": [
222+
"# First generate the individual reference files to be appended\n",
223+
"\n",
224+
"second_24_hrs = file_pattern[24:48]\n",
225+
"\n",
226+
"# Generate Dask Delayed objects\n",
227+
"tasks = [dask.delayed(generate_json_reference)(fil, temp_dir) for fil in second_24_hrs]\n",
228+
"\n",
229+
"# Generate reference files for the individual NetCDF files\n",
230+
"dask.compute(tasks)"
231+
]
232+
},
233+
{
234+
"cell_type": "code",
235+
"execution_count": null,
236+
"metadata": {},
237+
"outputs": [],
238+
"source": [
239+
"# Load the original references\n",
240+
"fs_local = fsspec.filesystem(\"file\")\n",
241+
"archive = ujson.load(fs_local.open(output_fname))"
242+
]
243+
},
244+
{
245+
"cell_type": "code",
246+
"execution_count": null,
247+
"metadata": {},
248+
"outputs": [],
249+
"source": [
250+
"# Create a list of individual reference files to append to the combined reference\n",
251+
"output_files = [\n",
252+
" f\"{temp_dir}/{f.strip('.nc').split('/')[-1]}.json\" for f in second_24_hrs\n",
253+
"]\n",
254+
"\n",
255+
"# Append to the existing reference file\n",
256+
"mzz = MultiZarrToZarr.append(\n",
257+
" output_files,\n",
258+
" original_refs=archive,\n",
259+
" concat_dims=[\"time\"],\n",
260+
" identical_dims=[\"y\", \"x\"],\n",
261+
" remote_protocol=\"s3\",\n",
262+
" remote_options={\"anon\": True},\n",
263+
" coo_map={\"time\": \"cf:time\"},\n",
264+
")\n",
265+
"\n",
266+
"multi_kerchunk = mzz.translate()\n",
267+
"\n",
268+
"# Write kerchunk .json record.\n",
269+
"output_fname = \"ARG_combined.json\"\n",
270+
"with open(f\"{output_fname}\", \"wb\") as f:\n",
271+
" f.write(ujson.dumps(multi_kerchunk).encode())"
272+
]
273+
},
274+
{
275+
"cell_type": "markdown",
276+
"metadata": {},
277+
"source": [
278+
"## Opening Reference Dataset with Fsspec and Xarray\n"
279+
]
280+
},
281+
{
282+
"cell_type": "code",
283+
"execution_count": null,
284+
"metadata": {},
285+
"outputs": [],
286+
"source": [
287+
"storage_options = {\n",
288+
" \"remote_protocol\": \"s3\",\n",
289+
" \"skip_instance_cache\": True,\n",
290+
"} # options passed to fsspec\n",
291+
"open_dataset_options = {\"chunks\": {}} # opens passed to xarray\n",
292+
"\n",
293+
"ds = xr.open_dataset(\n",
294+
" \"ARG_combined.json\",\n",
295+
" engine=\"kerchunk\",\n",
296+
" storage_options=storage_options,\n",
297+
" open_dataset_options=open_dataset_options,\n",
298+
")\n",
299+
"\n",
300+
"ds"
301+
]
302+
},
303+
{
304+
"cell_type": "code",
305+
"execution_count": null,
306+
"metadata": {},
307+
"outputs": [],
308+
"source": []
309+
}
310+
],
311+
"metadata": {
312+
"kernelspec": {
313+
"display_name": "kerchunk-cookbook",
314+
"language": "python",
315+
"name": "python3"
316+
},
317+
"language_info": {
318+
"codemirror_mode": {
319+
"name": "ipython",
320+
"version": 3
321+
},
322+
"file_extension": ".py",
323+
"mimetype": "text/x-python",
324+
"name": "python",
325+
"nbconvert_exporter": "python",
326+
"pygments_lexer": "ipython3",
327+
"version": "3.10.13"
328+
}
329+
},
330+
"nbformat": 4,
331+
"nbformat_minor": 2
332+
}

0 commit comments

Comments
 (0)