Cloud-optimized access to CESM Timeseries netCDF files in the Cloud with kerchunk
#
Summary#
We benchmark reading a subset of the CESM2-Large Ensemble stored as a collection of netCDF files on the cloud (Amazon / AWS) from Casper. We use a single ensemble member historical experiment with daily data from 1850 to 2009, with a total dataset size of 600+ GB, from 13 netCDF4 files.
We read in two ways:
accessing the netCDF files directly
using
kerchunk
as an intermediate layer to enable cloud-optimized access.
File Access Method |
Dataset Lazy Read In Wall Time |
Visualization Wall Time |
---|---|---|
s3 netcdf |
8-9 s |
6 min 21 s |
kerchunk |
460 ms |
43.4 s |
The “s3 netcdf” row refers to accessing the data remotely using the traditional s3 api, the interface with Amazon’s cloud storage. There are Google and Microsoft equivalents of this
kerchunk is a new package, developed within the fsspec python community, which aims to improve I/O performance when reading from cloud hosted datasets which are not neccessarily in a “cloud optimized format (ex. netCDF vs. Zarr) where netCDF was engineered to performant on regular POSIX filesystems.
Note that usually you would want to execute some benchmarks and analysis on a cloud instance. The performance benefits here extend to that use case but the gap will be smaller (but still significant). For more see this post and this post.
Tip
The Project Pythia Cookbook on kerchunk is a great resource!
Imports#
%load_ext watermark
import glob
import os
import cftime
import dask
import fsspec
import holoviews as hv
import hvplot.xarray
import kerchunk
import s3fs
import ujson # fast json
import xarray as xr
from distributed import Client
from kerchunk.combine import MultiZarrToZarr
from kerchunk.hdf import SingleHdf5ToZarr
from ncar_jobqueue import NCARCluster
hv.extension('bokeh')
%watermark -iv
xarray : 2023.2.0
ujson : 5.7.0
json : 2.0.9
dask : 2023.3.0
kerchunk : 0.1.0
s3fs : 2023.3.0
hvplot : 0.8.2
cftime : 1.6.2
fsspec : 2023.3.0
holoviews: 1.15.4
sys : 3.11.0 | packaged by conda-forge | (main, Jan 14 2023, 12:27:40) [GCC 11.3.0]
Gather a list of files available from an Amazon s3 bucket#
As mentioned previously, s3 is Amazon’s object store file system. For this example, I moved a single ensemble member’s worth of daily historical timeseries output from the CESM2-LE dataset.
We use fsspec
to create a filesystem-like representation:
fs = fsspec.filesystem('s3', anon=True, skip_instance_cache=True)
filelist = fs.glob(f'ncar-cesm2-lens/tseries/*nc')
filelist
['ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.18500101-18591231.nc',
'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.18600101-18691231.nc',
'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.18700101-18791231.nc',
'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.18800101-18891231.nc',
'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.18900101-18991231.nc',
'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19000101-19091231.nc',
'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19100101-19191231.nc',
'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19200101-19291231.nc',
'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19300101-19391231.nc',
'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19400101-19491231.nc',
'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19500101-19591231.nc',
'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19600101-19691231.nc',
'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19700101-19791231.nc',
'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19800101-19891231.nc',
'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19900101-19991231.nc',
'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.20000101-20091231.nc',
'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.20100101-20141231.nc']
We can check the size of a single dataset here
print(fs.size(filelist[0]) / 1e9, 'gb')
37.649093521 gb
We need to add the s3://
portion at the beginning to specify this is from the s3 filesystem.
Note
We’ll use only last 5 files to reduce access time.
filelist = filelist[-5:]
urls = ["s3://" + f for f in filelist]
so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first')
Approach#
kerchunk
works by first reading a “reference” file that maps virtual chunks of data to byte ranges stored in specific files. This “reference” file looks like a Zarr dataset, and is read using the Zarr library. Previously, we demonstrated the use of Kerchunk to create virtual aggregate datasets without modifying the files on disk.
Here, the goal is to demonstrate cloud-optimized data access enabled by using Zarr as an intermediate layer to access underlying netCDF files.
The approach is similar:
We generate a “reference” JSON file, effectively a Zarr representation, for each netCDF file.
kerchunk
provides a helper function for this. netCDF4 files are HDF5 files so we use that helper function:kerchunk.hdf.SingleHdf5ToZarr
We combine these into a single reference file representing the aggregate dataset:
MultiZarrToZarr
Generate the Mapper Files#
Set the output directory of the json mapping files
json_dir = 'cesm2_le_reference_files'
We set up a helper function to help with generating these files
def gen_json(u):
with fs.open(u, **so) as infile:
h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300)
p = u.split('/')
fname = p[-1]
outf = f'{fname}.json'
print(outf)
with open(f"{json_dir}/{outf}", 'wb') as f:
f.write(ujson.dumps(h5chunks.translate()).encode());
Spin up a Cluster#
We will spin up a dask cluster to add distributed/parallel computing - we are spinning up 10 workers, each with 25 GB of RAM and a single processor
cluster = NCARCluster()
cluster.scale(5)
client = Client(cluster)
client
/glade/u/home/dcherian/miniconda3/envs/os-obs/lib/python3.11/site-packages/dask_jobqueue/core.py:255: FutureWarning: job_extra has been renamed to job_extra_directives. You are still using it (even if only set to []; please also check config files). If you did not set job_extra_directives yet, job_extra will be respected for now, but it will be removed in a future release. If you already set job_extra_directives, job_extra is ignored and you can remove it.
warnings.warn(warn, FutureWarning)
/glade/u/home/dcherian/miniconda3/envs/os-obs/lib/python3.11/site-packages/dask_jobqueue/core.py:274: FutureWarning: env_extra has been renamed to job_script_prologue. You are still using it (even if only set to []; please also check config files). If you did not set job_script_prologue yet, env_extra will be respected for now, but it will be removed in a future release. If you already set job_script_prologue, env_extra is ignored and you can remove it.
warnings.warn(warn, FutureWarning)
/glade/u/home/dcherian/miniconda3/envs/os-obs/lib/python3.11/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 43962 instead
warnings.warn(
Client
Client-ccd4a03a-c33f-11ed-a1ea-3cecef1b11fa
Connection method: Cluster object | Cluster type: dask_jobqueue.PBSCluster |
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/dcherian/proxy/43962/status |
Cluster Info
PBSCluster
e1214b55
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/dcherian/proxy/43962/status | Workers: 0 |
Total threads: 0 | Total memory: 0 B |
Scheduler Info
Scheduler
Scheduler-f59b0bfe-ae4f-409a-8454-e622b658eb45
Comm: tcp://10.12.206.54:37597 | Workers: 0 |
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/dcherian/proxy/43962/status | Total threads: 0 |
Started: Just now | Total memory: 0 B |
Workers
Create reference file for each netCDF file#
We can compute these reference files in parallel!
%%time
dask.compute(*[dask.delayed(gen_json)(u) for u in urls], retries=10);
CPU times: user 7.47 s, sys: 580 ms, total: 8.05 s
Wall time: 3min 25s
Merge the reference files together#
The previous process produced a single reference file for each netCDF file - let’s merge these all together to create single store.
reference_files = sorted(glob.glob('cesm2_le_reference_files/*'))
reference_files
['cesm2_le_reference_files/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19700101-19791231.nc.json',
'cesm2_le_reference_files/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19800101-19891231.nc.json',
'cesm2_le_reference_files/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19900101-19991231.nc.json',
'cesm2_le_reference_files/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.20000101-20091231.nc.json',
'cesm2_le_reference_files/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.20100101-20141231.nc.json']
Now, we can generate the single store using MultiZarrToZarr
! We add a few extra arguments here… no need to worry about the details here.
mzz = MultiZarrToZarr(
reference_files,
target_options={'anon': False},
remote_protocol='s3',
remote_options={'anon': 'True'}, # JSON files
concat_dims="time",
)
After merging our reference files together, we can output the new json!
%%time
#%%prun -D multizarr_profile
mzz.translate('cesm2-le-tseries-aws.json')
CPU times: user 2.27 s, sys: 578 ms, total: 2.85 s
Wall time: 3.29 s
Read directly from S3 (without kerchunk
)#
We went through all that work previously, but let’s take a look at what the performance would be without using this new Kerchunk
package.
To open the files, use fsspec
again and generate a list of URLs.
fs_s3 = s3fs.S3FileSystem(anon=True)
flist = [fs_s3.open(path, mode="rb") for path in urls]
Now read in with open_mfdataset
Warning
parallel=True
does not work at the moment.
Our wall time for lazily reading in the datasets isn’t too bad…
%%time
ds_read_directly = xr.open_mfdataset(
flist,
engine="h5netcdf",
chunks={"lat": 20, "lon": 20},
data_vars="minimal",
coords="minimal",
compat="override",
)
CPU times: user 2.69 s, sys: 310 ms, total: 3 s
Wall time: 9.92 s
ds_read_directly
<xarray.Dataset> Dimensions: (lat: 192, lon: 288, zlon: 1, nbnd: 2, lev: 32, ilev: 33, time: 16426) Coordinates: * lat (lat) float64 -90.0 -89.06 -88.12 -87.17 ... 88.12 89.06 90.0 * lon (lon) float64 0.0 1.25 2.5 3.75 ... 355.0 356.2 357.5 358.8 * zlon (zlon) float64 0.0 * lev (lev) float64 3.643 7.595 14.36 24.61 ... 957.5 976.3 992.6 * ilev (ilev) float64 2.255 5.032 10.16 18.56 ... 967.5 985.1 1e+03 * time (time) object 1970-01-01 00:00:00 ... 2015-01-01 00:00:00 Dimensions without coordinates: nbnd Data variables: (12/27) gw (lat) float64 dask.array<chunksize=(20,), meta=np.ndarray> zlon_bnds (zlon, nbnd) float64 dask.array<chunksize=(1, 2), meta=np.ndarray> hyam (lev) float64 dask.array<chunksize=(32,), meta=np.ndarray> hybm (lev) float64 dask.array<chunksize=(32,), meta=np.ndarray> P0 float64 ... hyai (ilev) float64 dask.array<chunksize=(33,), meta=np.ndarray> ... ... n2ovmr (time) float64 dask.array<chunksize=(3650,), meta=np.ndarray> f11vmr (time) float64 dask.array<chunksize=(3650,), meta=np.ndarray> f12vmr (time) float64 dask.array<chunksize=(3650,), meta=np.ndarray> sol_tsi (time) float64 dask.array<chunksize=(3650,), meta=np.ndarray> nsteph (time) float64 dask.array<chunksize=(3650,), meta=np.ndarray> T (time, lev, lat, lon) float64 dask.array<chunksize=(3650, 32, 20, 20), meta=np.ndarray> Attributes: Conventions: CF-1.0 source: CAM case: b.e21.BHISTcmip6.f09_g17.LE2-1001.001 logname: sunseon host: mom2 initial_file: b.e21.B1850.f09_g17.CMIP6-piControl.001.cam.i.1001-01-... topography_file: /mnt/lustre/share/CESM/cesm_input/atm/cam/topo/fv_0.9x... model_doi_url: https://doi.org/10.5065/D67H1H0V time_period_freq: day_1
But, if we actually load in some data to plot, you’ll see that it is pretty slow…
%%time
ds_read_directly.T.isel(time=range(1460), lev=0, lat=0, lon=0).hvplot(x='time', grid=True)
WARNING:param.CurvePlot00876: Converting cftime.datetime from a non-standard calendar (noleap) to a standard calendar for plotting. This may lead to subtle errors in formatting dates, for accurate tick formatting switch to the matplotlib backend.
CPU times: user 42.7 s, sys: 3.07 s, total: 45.8 s
Wall time: 17min 12s
Benchmark Reading in from Kerchunk
mappings#
Now, we can try reading in using our reference files (created from Kerchunk
)
fs = fsspec.filesystem(
"reference",
fo='cesm2-le-tseries-aws.json',
remote_protocol='s3',
remote_options={'anon': True},
)
m = fs.get_mapper("")
Read the files in an zarr
instead of netCDF
(remember the reference file is a virtual Zarr dataset).
%%time
ds = xr.open_dataset(m, engine="zarr")
ds
<timed exec>:1: RuntimeWarning: Failed to open Zarr store with consolidated metadata, but successfully read with non-consolidated metadata. This is typically much slower for opening a dataset. To silence this warning, consider:
1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
CPU times: user 212 ms, sys: 22.3 ms, total: 234 ms
Wall time: 484 ms
<xarray.Dataset> Dimensions: (time: 16426, lev: 32, lat: 192, lon: 288, ilev: 33, nbnd: 2, zlon: 1) Coordinates: * ilev (ilev) float64 2.255 5.032 10.16 18.56 ... 967.5 985.1 1e+03 * lat (lat) float64 -90.0 -89.06 -88.12 -87.17 ... 88.12 89.06 90.0 * lev (lev) float64 3.643 7.595 14.36 24.61 ... 957.5 976.3 992.6 * lon (lon) float64 0.0 1.25 2.5 3.75 ... 355.0 356.2 357.5 358.8 * time (time) object 1970-01-01 00:00:00 ... 2015-01-01 00:00:00 * zlon (zlon) float64 0.0 Dimensions without coordinates: nbnd Data variables: (12/27) P0 (time) float64 ... T (time, lev, lat, lon) float64 ... ch4vmr (time) float64 ... co2vmr (time) float64 ... date (time) float64 ... date_written (time) object ... ... ... nscur (time) float64 ... nsteph (time) float64 ... sol_tsi (time) float64 ... time_bnds (time, nbnd) object ... time_written (time) object ... zlon_bnds (time, zlon, nbnd) float64 ... Attributes: Conventions: CF-1.0 case: b.e21.BHISTcmip6.f09_g17.LE2-1001.001 host: mom2 initial_file: b.e21.B1850.f09_g17.CMIP6-piControl.001.cam.i.1001-01-... logname: sunseon model_doi_url: https://doi.org/10.5065/D67H1H0V source: CAM time_period_freq: day_1 topography_file: /mnt/lustre/share/CESM/cesm_input/atm/cam/topo/fv_0.9x...
Plot the output from the Mapper Method#
%%time
ds.T.isel(time=range(1460), lev=0, lat=0, lon=0).hvplot(x='time', grid=True)
WARNING:param.CurvePlot00983: Converting cftime.datetime from a non-standard calendar (noleap) to a standard calendar for plotting. This may lead to subtle errors in formatting dates, for accurate tick formatting switch to the matplotlib backend.
CPU times: user 18 s, sys: 2.09 s, total: 20.1 s
Wall time: 44.2 s