Cloud-optimized access to CESM Timeseries netCDF files in the Cloud with kerchunk#

Summary#

We benchmark reading a subset of the CESM2-Large Ensemble stored as a collection of netCDF files on the cloud (Amazon / AWS) from Casper. We use a single ensemble member historical experiment with daily data from 1850 to 2009, with a total dataset size of 600+ GB, from 13 netCDF4 files.

We read in two ways:

  • accessing the netCDF files directly

  • using kerchunk as an intermediate layer to enable cloud-optimized access.

File Access Method

Dataset Lazy Read In Wall Time

Visualization Wall Time

s3 netcdf

8-9 s

6 min 21 s

kerchunk

460 ms

43.4 s

The “s3 netcdf” row refers to accessing the data remotely using the traditional s3 api, the interface with Amazon’s cloud storage. There are Google and Microsoft equivalents of this

kerchunk is a new package, developed within the fsspec python community, which aims to improve I/O performance when reading from cloud hosted datasets which are not neccessarily in a “cloud optimized format (ex. netCDF vs. Zarr) where netCDF was engineered to performant on regular POSIX filesystems.

Note that usually you would want to execute some benchmarks and analysis on a cloud instance. The performance benefits here extend to that use case but the gap will be smaller (but still significant). For more see this post and this post.

Tip

The Project Pythia Cookbook on kerchunk is a great resource!

Imports#

%load_ext watermark

import glob
import os

import cftime
import dask
import fsspec
import holoviews as hv
import hvplot.xarray
import kerchunk
import s3fs
import ujson  # fast json
import xarray as xr
from distributed import Client
from kerchunk.combine import MultiZarrToZarr
from kerchunk.hdf import SingleHdf5ToZarr
from ncar_jobqueue import NCARCluster

hv.extension('bokeh')

%watermark -iv
xarray   : 2023.2.0
ujson    : 5.7.0
json     : 2.0.9
dask     : 2023.3.0
kerchunk : 0.1.0
s3fs     : 2023.3.0
hvplot   : 0.8.2
cftime   : 1.6.2
fsspec   : 2023.3.0
holoviews: 1.15.4
sys      : 3.11.0 | packaged by conda-forge | (main, Jan 14 2023, 12:27:40) [GCC 11.3.0]

Gather a list of files available from an Amazon s3 bucket#

As mentioned previously, s3 is Amazon’s object store file system. For this example, I moved a single ensemble member’s worth of daily historical timeseries output from the CESM2-LE dataset.

We use fsspec to create a filesystem-like representation:

fs = fsspec.filesystem('s3', anon=True, skip_instance_cache=True)
filelist = fs.glob(f'ncar-cesm2-lens/tseries/*nc')
filelist
['ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.18500101-18591231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.18600101-18691231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.18700101-18791231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.18800101-18891231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.18900101-18991231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19000101-19091231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19100101-19191231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19200101-19291231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19300101-19391231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19400101-19491231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19500101-19591231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19600101-19691231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19700101-19791231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19800101-19891231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19900101-19991231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.20000101-20091231.nc',
 'ncar-cesm2-lens/tseries/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.20100101-20141231.nc']

We can check the size of a single dataset here

print(fs.size(filelist[0]) / 1e9, 'gb')
37.649093521 gb

We need to add the s3:// portion at the beginning to specify this is from the s3 filesystem.

Note

We’ll use only last 5 files to reduce access time.

filelist = filelist[-5:]
urls = ["s3://" + f for f in filelist]
so = dict(mode='rb', anon=True, default_fill_cache=False, default_cache_type='first')

Approach#

kerchunk works by first reading a “reference” file that maps virtual chunks of data to byte ranges stored in specific files. This “reference” file looks like a Zarr dataset, and is read using the Zarr library. Previously, we demonstrated the use of Kerchunk to create virtual aggregate datasets without modifying the files on disk.

Here, the goal is to demonstrate cloud-optimized data access enabled by using Zarr as an intermediate layer to access underlying netCDF files.

The approach is similar:

  • We generate a “reference” JSON file, effectively a Zarr representation, for each netCDF file. kerchunk provides a helper function for this. netCDF4 files are HDF5 files so we use that helper function: kerchunk.hdf.SingleHdf5ToZarr

  • We combine these into a single reference file representing the aggregate dataset: MultiZarrToZarr

Generate the Mapper Files#

Set the output directory of the json mapping files

json_dir = 'cesm2_le_reference_files'

We set up a helper function to help with generating these files

def gen_json(u):
    with fs.open(u, **so) as infile:
        h5chunks = SingleHdf5ToZarr(infile, u, inline_threshold=300)
        p = u.split('/')
        fname = p[-1]
        outf = f'{fname}.json'
        print(outf)
        with open(f"{json_dir}/{outf}", 'wb') as f:
            f.write(ujson.dumps(h5chunks.translate()).encode());

Spin up a Cluster#

We will spin up a dask cluster to add distributed/parallel computing - we are spinning up 10 workers, each with 25 GB of RAM and a single processor

cluster = NCARCluster()
cluster.scale(5)
client = Client(cluster)
client
/glade/u/home/dcherian/miniconda3/envs/os-obs/lib/python3.11/site-packages/dask_jobqueue/core.py:255: FutureWarning: job_extra has been renamed to job_extra_directives. You are still using it (even if only set to []; please also check config files). If you did not set job_extra_directives yet, job_extra will be respected for now, but it will be removed in a future release. If you already set job_extra_directives, job_extra is ignored and you can remove it.
  warnings.warn(warn, FutureWarning)
/glade/u/home/dcherian/miniconda3/envs/os-obs/lib/python3.11/site-packages/dask_jobqueue/core.py:274: FutureWarning: env_extra has been renamed to job_script_prologue. You are still using it (even if only set to []; please also check config files). If you did not set job_script_prologue yet, env_extra will be respected for now, but it will be removed in a future release. If you already set job_script_prologue, env_extra is ignored and you can remove it.
  warnings.warn(warn, FutureWarning)
/glade/u/home/dcherian/miniconda3/envs/os-obs/lib/python3.11/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 43962 instead
  warnings.warn(

Client

Client-ccd4a03a-c33f-11ed-a1ea-3cecef1b11fa

Connection method: Cluster object Cluster type: dask_jobqueue.PBSCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/dcherian/proxy/43962/status

Cluster Info

Create reference file for each netCDF file#

We can compute these reference files in parallel!

%%time
dask.compute(*[dask.delayed(gen_json)(u) for u in urls], retries=10);
CPU times: user 7.47 s, sys: 580 ms, total: 8.05 s
Wall time: 3min 25s

Merge the reference files together#

The previous process produced a single reference file for each netCDF file - let’s merge these all together to create single store.

reference_files = sorted(glob.glob('cesm2_le_reference_files/*'))
reference_files
['cesm2_le_reference_files/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19700101-19791231.nc.json',
 'cesm2_le_reference_files/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19800101-19891231.nc.json',
 'cesm2_le_reference_files/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.19900101-19991231.nc.json',
 'cesm2_le_reference_files/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.20000101-20091231.nc.json',
 'cesm2_le_reference_files/b.e21.BHISTcmip6.f09_g17.LE2-1001.001.cam.h6.T.20100101-20141231.nc.json']

Now, we can generate the single store using MultiZarrToZarr! We add a few extra arguments here… no need to worry about the details here.

mzz = MultiZarrToZarr(
    reference_files,
    target_options={'anon': False},
    remote_protocol='s3',
    remote_options={'anon': 'True'},  # JSON files
    concat_dims="time",
)

After merging our reference files together, we can output the new json!

%%time
#%%prun -D multizarr_profile
mzz.translate('cesm2-le-tseries-aws.json')
CPU times: user 2.27 s, sys: 578 ms, total: 2.85 s
Wall time: 3.29 s

Read directly from S3 (without kerchunk)#

We went through all that work previously, but let’s take a look at what the performance would be without using this new Kerchunk package.

To open the files, use fsspec again and generate a list of URLs.

fs_s3 = s3fs.S3FileSystem(anon=True)

flist = [fs_s3.open(path, mode="rb") for path in urls]

Now read in with open_mfdataset

Warning

parallel=True does not work at the moment.

Our wall time for lazily reading in the datasets isn’t too bad…

%%time
ds_read_directly = xr.open_mfdataset(
    flist,
    engine="h5netcdf",
    chunks={"lat": 20, "lon": 20},
    data_vars="minimal",
    coords="minimal",
    compat="override",
)
CPU times: user 2.69 s, sys: 310 ms, total: 3 s
Wall time: 9.92 s
ds_read_directly
<xarray.Dataset>
Dimensions:       (lat: 192, lon: 288, zlon: 1, nbnd: 2, lev: 32, ilev: 33,
                   time: 16426)
Coordinates:
  * lat           (lat) float64 -90.0 -89.06 -88.12 -87.17 ... 88.12 89.06 90.0
  * lon           (lon) float64 0.0 1.25 2.5 3.75 ... 355.0 356.2 357.5 358.8
  * zlon          (zlon) float64 0.0
  * lev           (lev) float64 3.643 7.595 14.36 24.61 ... 957.5 976.3 992.6
  * ilev          (ilev) float64 2.255 5.032 10.16 18.56 ... 967.5 985.1 1e+03
  * time          (time) object 1970-01-01 00:00:00 ... 2015-01-01 00:00:00
Dimensions without coordinates: nbnd
Data variables: (12/27)
    gw            (lat) float64 dask.array<chunksize=(20,), meta=np.ndarray>
    zlon_bnds     (zlon, nbnd) float64 dask.array<chunksize=(1, 2), meta=np.ndarray>
    hyam          (lev) float64 dask.array<chunksize=(32,), meta=np.ndarray>
    hybm          (lev) float64 dask.array<chunksize=(32,), meta=np.ndarray>
    P0            float64 ...
    hyai          (ilev) float64 dask.array<chunksize=(33,), meta=np.ndarray>
    ...            ...
    n2ovmr        (time) float64 dask.array<chunksize=(3650,), meta=np.ndarray>
    f11vmr        (time) float64 dask.array<chunksize=(3650,), meta=np.ndarray>
    f12vmr        (time) float64 dask.array<chunksize=(3650,), meta=np.ndarray>
    sol_tsi       (time) float64 dask.array<chunksize=(3650,), meta=np.ndarray>
    nsteph        (time) float64 dask.array<chunksize=(3650,), meta=np.ndarray>
    T             (time, lev, lat, lon) float64 dask.array<chunksize=(3650, 32, 20, 20), meta=np.ndarray>
Attributes:
    Conventions:       CF-1.0
    source:            CAM
    case:              b.e21.BHISTcmip6.f09_g17.LE2-1001.001
    logname:           sunseon
    host:              mom2
    initial_file:      b.e21.B1850.f09_g17.CMIP6-piControl.001.cam.i.1001-01-...
    topography_file:   /mnt/lustre/share/CESM/cesm_input/atm/cam/topo/fv_0.9x...
    model_doi_url:     https://doi.org/10.5065/D67H1H0V
    time_period_freq:  day_1

But, if we actually load in some data to plot, you’ll see that it is pretty slow…

%%time
ds_read_directly.T.isel(time=range(1460), lev=0, lat=0, lon=0).hvplot(x='time', grid=True)
WARNING:param.CurvePlot00876: Converting cftime.datetime from a non-standard calendar (noleap) to a standard calendar for plotting. This may lead to subtle errors in formatting dates, for accurate tick formatting switch to the matplotlib backend.
CPU times: user 42.7 s, sys: 3.07 s, total: 45.8 s
Wall time: 17min 12s

Benchmark Reading in from Kerchunk mappings#

Now, we can try reading in using our reference files (created from Kerchunk)

fs = fsspec.filesystem(
    "reference",
    fo='cesm2-le-tseries-aws.json',
    remote_protocol='s3',
    remote_options={'anon': True},
)
m = fs.get_mapper("")

Read the files in an zarr instead of netCDF (remember the reference file is a virtual Zarr dataset).

%%time
ds = xr.open_dataset(m, engine="zarr")
ds
<timed exec>:1: RuntimeWarning: Failed to open Zarr store with consolidated metadata, but successfully read with non-consolidated metadata. This is typically much slower for opening a dataset. To silence this warning, consider:
1. Consolidating metadata in this existing store with zarr.consolidate_metadata().
2. Explicitly setting consolidated=False, to avoid trying to read consolidate metadata, or
3. Explicitly setting consolidated=True, to raise an error in this case instead of falling back to try reading non-consolidated metadata.
CPU times: user 212 ms, sys: 22.3 ms, total: 234 ms
Wall time: 484 ms
<xarray.Dataset>
Dimensions:       (time: 16426, lev: 32, lat: 192, lon: 288, ilev: 33, nbnd: 2,
                   zlon: 1)
Coordinates:
  * ilev          (ilev) float64 2.255 5.032 10.16 18.56 ... 967.5 985.1 1e+03
  * lat           (lat) float64 -90.0 -89.06 -88.12 -87.17 ... 88.12 89.06 90.0
  * lev           (lev) float64 3.643 7.595 14.36 24.61 ... 957.5 976.3 992.6
  * lon           (lon) float64 0.0 1.25 2.5 3.75 ... 355.0 356.2 357.5 358.8
  * time          (time) object 1970-01-01 00:00:00 ... 2015-01-01 00:00:00
  * zlon          (zlon) float64 0.0
Dimensions without coordinates: nbnd
Data variables: (12/27)
    P0            (time) float64 ...
    T             (time, lev, lat, lon) float64 ...
    ch4vmr        (time) float64 ...
    co2vmr        (time) float64 ...
    date          (time) float64 ...
    date_written  (time) object ...
    ...            ...
    nscur         (time) float64 ...
    nsteph        (time) float64 ...
    sol_tsi       (time) float64 ...
    time_bnds     (time, nbnd) object ...
    time_written  (time) object ...
    zlon_bnds     (time, zlon, nbnd) float64 ...
Attributes:
    Conventions:       CF-1.0
    case:              b.e21.BHISTcmip6.f09_g17.LE2-1001.001
    host:              mom2
    initial_file:      b.e21.B1850.f09_g17.CMIP6-piControl.001.cam.i.1001-01-...
    logname:           sunseon
    model_doi_url:     https://doi.org/10.5065/D67H1H0V
    source:            CAM
    time_period_freq:  day_1
    topography_file:   /mnt/lustre/share/CESM/cesm_input/atm/cam/topo/fv_0.9x...

Plot the output from the Mapper Method#

%%time
ds.T.isel(time=range(1460), lev=0, lat=0, lon=0).hvplot(x='time', grid=True)
WARNING:param.CurvePlot00983: Converting cftime.datetime from a non-standard calendar (noleap) to a standard calendar for plotting. This may lead to subtle errors in formatting dates, for accurate tick formatting switch to the matplotlib backend.
CPU times: user 18 s, sys: 2.09 s, total: 20.1 s
Wall time: 44.2 s