Thinking through CESM data access#
We want to read a large number of netCDF files, combine them to form a single dataset, and then analyze that. How do we think about it?
In pseudocode we want
# loop over every file and read in metadata
datasets = [xr.open_dataset(file) for file in files]
# optionally make modifications
preprocessed = [preprocess(dataset) for dataset in datasets]
# combine to create a single dataset
combined = xr.combine_XXX(preprocessed, ...)
Xarray’s open_mfdataset
implements this pattern with the option of parallelizing the loop over all files using dask
. This can be quite handy.
Creating a new data pipeline#
First create a list of files#
The glob
package is good for this (docs)
The glob module finds all the pathnames matching a specified pattern according to the rules used by the Unix shell, although results are returned in arbitrary order. No tilde expansion is done, but *, ?, and character ranges expressed with [] will be correctly matched.
Important
The tilde
~
is not expanded to the user’s home directory. Useos.path.expanduser
for that.The list of files returned by
glob
is not sorted ! Usesorted
to sort the list.
Here’s a list of files: these are timeseries files with output for years 1850-2100, and 50 ensemble members.
import glob
files = glob.glob(
"/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/*smbb**h1**18500101-21001231*"
)
files
['/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1251.011.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1251.012.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1281.019.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1071.004.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1171.009.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1151.008.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1231.018.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1301.014.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1111.006.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1301.013.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1251.018.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1281.015.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1251.016.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1091.005.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1251.020.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1251.015.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1251.013.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1301.017.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1301.011.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1011.001.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1281.011.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1281.018.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1301.018.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1231.014.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1281.013.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1231.019.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1231.015.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1281.016.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1131.007.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1301.012.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1231.020.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1051.003.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1191.010.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1231.016.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1231.013.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1281.014.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1301.019.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1231.011.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1301.016.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1231.012.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1301.015.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1251.019.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1281.017.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1251.014.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1231.017.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1301.020.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1281.020.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1251.017.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1031.002.cam.h1.PRECT.18500101-21001231.nc',
'/glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/PRECT/b.e21.BHISTsmbb.f09_g17.LE2-1281.012.cam.h1.PRECT.18500101-21001231.nc']
There are 50 files, one per ensemble member
len(files)
50
Start by opening a single file#
import xarray as xr
single = xr.open_dataset(files[0])
single
<xarray.Dataset> Dimensions: (time: 8761, bnds: 2, lon: 288, lat: 192) Coordinates: * time (time) object 2000-01-01 00:00:00 ... 2023-12-31 00:00:00 * lon (lon) float64 0.0 1.25 2.5 3.75 5.0 ... 355.0 356.2 357.5 358.8 * lat (lat) float64 -90.0 -89.06 -88.12 -87.17 ... 88.12 89.06 90.0 Dimensions without coordinates: bnds Data variables: time_bnds (time, bnds) object ... PRECT (time, lat, lon) float32 ... Attributes: (12/13) CDI: Climate Data Interface version 2.0.2 (https://mpimet.m... Conventions: CF-1.0 source: CAM case: b.e21.BHISTsmbb.f09_g17.LE2-1251.011 logname: sunseon host: mom1 ... ... topography_file: /mnt/lustre/share/CESM/cesm_input/atm/cam/topo/fv_0.9x... model_doi_url: https://doi.org/10.5065/D67H1H0V time_period_freq: day_1 history: Wed May 17 11:54:18 2023: cdo selvar,PRECT tmp.nc PREC... NCO: netCDF Operators version 5.0.3 (Homepage = http://nco.... CDO: Climate Data Operators version 2.0.1 (https://mpimet.m...
First check data size
single.nbytes / 1e9 # approx GB
1.938007128
Each single file is 20GB and we have 50 of them, so approximately a terabyte in total. We will have to use dask.
That means we need to make chunking decisions.
Later on, we will extract time series at a single point, so let’s chunk in space, and choosing chunksizes for the data variable PRECT
.
Start by looking at dimension names for PRECT
single.PRECT
<xarray.DataArray 'PRECT' (time: 8761, lat: 192, lon: 288)> [484448256 values with dtype=float32] Coordinates: * time (time) object 2000-01-01 00:00:00 ... 2023-12-31 00:00:00 * lon (lon) float64 0.0 1.25 2.5 3.75 5.0 ... 355.0 356.2 357.5 358.8 * lat (lat) float64 -90.0 -89.06 -88.12 -87.17 ... 87.17 88.12 89.06 90.0 Attributes: long_name: Total (convective and large-scale) precipitation rate (liq... units: m/s cell_methods: time: mean
Choosing a chunk size#
This is a timeseries file with daily average output using the noleap
calendar.
We will concatenate ensemble members together to create a single dataset along a new dimension "ensemble"
. Today, we cannot create an xarray dataset with chunksizes that span files. In other words, because there is one file per ensemble member, and we are concatenating ensemble members along a new dimension, the chunksize for the new dimension will be one. It is possible to rechunk later, but that will involve expensive communication that is best to avoid unless you really need to do so.
We could chunk along space because we want to plot time series at a single point later. After some experimenting we choose a size of 16 along lat
, 32 along lon
, and all timesteps in a single chunk, for a chunksize of ~180MB.
Tip
Many other chunking choices are possible, it all depends on what you want to do later. For example we could have bigger spatial chunks, and smaller chunks along time. Here is some reading material on chunking/xarray/dask:
When choosing the size of chunks it is best to make them neither too small, nor too big (around 100MB is often reasonable). Each chunk needs to be able to fit into the worker memory and operations on that chunk should take some non-trivial amount of time (more than 100ms). For many more recommendations take a look at the docs on chunks…
single.PRECT.chunk({"time": 365 * 5})
<xarray.DataArray 'PRECT' (time: 8761, lat: 192, lon: 288)> dask.array<xarray-<this-array>, shape=(8761, 192, 288), dtype=float32, chunksize=(1825, 192, 288), chunktype=numpy.ndarray> Coordinates: * time (time) object 2000-01-01 00:00:00 ... 2023-12-31 00:00:00 * lon (lon) float64 0.0 1.25 2.5 3.75 5.0 ... 355.0 356.2 357.5 358.8 * lat (lat) float64 -90.0 -89.06 -88.12 -87.17 ... 87.17 88.12 89.06 90.0 Attributes: long_name: Total (convective and large-scale) precipitation rate (liq... units: m/s cell_methods: time: mean
Test with a small subset#
Let’s trying reading just 3 files to make sure the output looks as we expect.
We choose combine="nested"
instead of combine="by_coords"
. This will simply concatenate the files in the order provided. The term “nested” is used becasue it can accept a nested list-of-lists as input and concatenate along multiple dimensions. In contrast, by_coords
will look at coordinate location values and make decisions about which dimensions to concatenate along. This can sometimes backfire, and it is almost always better to be explicit by providing the files in the right order, and specify combine="nested"
.
xr.open_mfdataset(
# make sure we sort
sorted(files)[:3],
# concatenate along a new dimension called "ensemble"
concat_dim="ensemble",
# just concatenate them together
combine="nested",
chunks={"lat": 16, "lon": 32},
parallel=True,
)
<xarray.Dataset> Dimensions: (time: 8761, ensemble: 3, bnds: 2, lon: 288, lat: 192) Coordinates: * time (time) object 2000-01-01 00:00:00 ... 2023-12-31 00:00:00 * lon (lon) float64 0.0 1.25 2.5 3.75 5.0 ... 355.0 356.2 357.5 358.8 * lat (lat) float64 -90.0 -89.06 -88.12 -87.17 ... 88.12 89.06 90.0 Dimensions without coordinates: ensemble, bnds Data variables: time_bnds (ensemble, time, bnds) object dask.array<chunksize=(1, 8761, 2), meta=np.ndarray> PRECT (ensemble, time, lat, lon) float32 dask.array<chunksize=(1, 8761, 16, 32), meta=np.ndarray> Attributes: (12/13) CDI: Climate Data Interface version 2.0.2 (https://mpimet.m... Conventions: CF-1.0 source: CAM case: b.e21.BHISTsmbb.f09_g17.LE2-1011.001 logname: sunseon host: mom2 ... ... topography_file: /mnt/lustre/share/CESM/cesm_input/atm/cam/topo/fv_0.9x... model_doi_url: https://doi.org/10.5065/D67H1H0V time_period_freq: day_1 history: Wed May 17 11:17:29 2023: cdo selvar,PRECT tmp.nc PREC... NCO: netCDF Operators version 5.0.3 (Homepage = http://nco.... CDO: Climate Data Operators version 2.0.1 (https://mpimet.m...
Notice that all variables have been concatenated along the ensemble
dimension even if we know it to be a constant: e.g. P0
.
Choosing combine options#
Xarray has a number of options to control this concatenation behaviour. The normal recommendation is the hard-to-interpret sequence data_vars="minimal", coords="minimal", compat="override"
. What does this mean?
"minimal"
fordata_vars
andcoords
means only concatenate variables that have the concatenation dimension already.For those variables without the concatenation dimension, xarray will look at the
compat
kwarg. Forcompat="different"
, the default, Xarray will check for equality of the variable across all files. Those that are different get concatenated, those that are the same, are simply copied over. This can get quite expensive, socompat="override"
allows you to skip equality checking and simply pick the variable from the first file. This is great for so-called ‘static variables’ such as grid variables that are invariant in time (and ensemble member).
Let’s try that
combined = xr.open_mfdataset(
# make sure we sort
sorted(files[:3]),
# concatenate along a new dimension called "ensemble"
concat_dim="ensemble",
chunks={"lat": 16, "lon": 32},
data_vars="minimal",
coords="minimal",
compat="override",
# just concatenate them together
combine="nested",
parallel=True,
)
combined
<xarray.Dataset> Dimensions: (time: 8761, bnds: 2, lon: 288, lat: 192) Coordinates: * time (time) object 2000-01-01 00:00:00 ... 2023-12-31 00:00:00 * lon (lon) float64 0.0 1.25 2.5 3.75 5.0 ... 355.0 356.2 357.5 358.8 * lat (lat) float64 -90.0 -89.06 -88.12 -87.17 ... 88.12 89.06 90.0 Dimensions without coordinates: bnds Data variables: time_bnds (time, bnds) object dask.array<chunksize=(8761, 2), meta=np.ndarray> PRECT (time, lat, lon) float32 dask.array<chunksize=(8761, 16, 32), meta=np.ndarray> Attributes: (12/13) CDI: Climate Data Interface version 2.0.2 (https://mpimet.m... Conventions: CF-1.0 source: CAM case: b.e21.BHISTsmbb.f09_g17.LE2-1251.011 logname: sunseon host: mom1 ... ... topography_file: /mnt/lustre/share/CESM/cesm_input/atm/cam/topo/fv_0.9x... model_doi_url: https://doi.org/10.5065/D67H1H0V time_period_freq: day_1 history: Wed May 17 11:54:18 2023: cdo selvar,PRECT tmp.nc PREC... NCO: netCDF Operators version 5.0.3 (Homepage = http://nco.... CDO: Climate Data Operators version 2.0.1 (https://mpimet.m...
Oops this doesn’t work for us! We didn’t concatenate PRECT
along the new ensemble
dimension.
combined.PRECT.dims
('time', 'lat', 'lon')
Try preprocessing the dataset to make it work better#
Our dataset doesn’t really fit the assumptions of open_mfdataset
. Luckily we can modify our datasets before the concatenation stage using the preprocess
kwarg (docs)
preprocess
: If provided, call this function on each dataset prior to concatenation. You can find the file-name from which each dataset was loaded in ds.encoding[“source”].
What we’ll do is to add a new dimension ensemble
to the PRECT
variable using expand_dims
.
This makes it clear that only PRECT
should be concatenated along the new ensemble
dimension
def add_ensemble_dim(ds):
ds["PRECT"] = ds.PRECT.expand_dims("ensemble")
return ds
combined = xr.open_mfdataset(
# make sure we sort
sorted(files)[:3],
# chunk the dataset from each file properly
chunks={"lat": 16, "lon": 32},
# concatenate along a new dimension called "ensemble"
concat_dim="ensemble",
data_vars="minimal",
coords="minimal",
compat="override",
combine="nested",
parallel=True,
preprocess=add_ensemble_dim,
)
combined
<xarray.Dataset> Dimensions: (time: 8761, bnds: 2, lon: 288, lat: 192, ensemble: 3) Coordinates: * time (time) object 2000-01-01 00:00:00 ... 2023-12-31 00:00:00 * lon (lon) float64 0.0 1.25 2.5 3.75 5.0 ... 355.0 356.2 357.5 358.8 * lat (lat) float64 -90.0 -89.06 -88.12 -87.17 ... 88.12 89.06 90.0 Dimensions without coordinates: bnds, ensemble Data variables: time_bnds (time, bnds) object dask.array<chunksize=(8761, 2), meta=np.ndarray> PRECT (ensemble, time, lat, lon) float32 dask.array<chunksize=(1, 8761, 16, 32), meta=np.ndarray> Attributes: (12/13) CDI: Climate Data Interface version 2.0.2 (https://mpimet.m... Conventions: CF-1.0 source: CAM case: b.e21.BHISTsmbb.f09_g17.LE2-1011.001 logname: sunseon host: mom2 ... ... topography_file: /mnt/lustre/share/CESM/cesm_input/atm/cam/topo/fv_0.9x... model_doi_url: https://doi.org/10.5065/D67H1H0V time_period_freq: day_1 history: Wed May 17 11:17:29 2023: cdo selvar,PRECT tmp.nc PREC... NCO: netCDF Operators version 5.0.3 (Homepage = http://nco.... CDO: Climate Data Operators version 2.0.1 (https://mpimet.m...
Much better!
combined.PRECT
<xarray.DataArray 'PRECT' (ensemble: 3, time: 8761, lat: 192, lon: 288)> dask.array<concatenate, shape=(3, 8761, 192, 288), dtype=float32, chunksize=(1, 8761, 16, 32), chunktype=numpy.ndarray> Coordinates: * time (time) object 2000-01-01 00:00:00 ... 2023-12-31 00:00:00 * lon (lon) float64 0.0 1.25 2.5 3.75 5.0 ... 355.0 356.2 357.5 358.8 * lat (lat) float64 -90.0 -89.06 -88.12 -87.17 ... 87.17 88.12 89.06 90.0 Dimensions without coordinates: ensemble Attributes: long_name: Total (convective and large-scale) precipitation rate (liq... units: m/s cell_methods: time: mean
Read and concatenate the whole dataset#
Create a dask cluster#
We’ll use an adaptive cluster to be polite.
The dask cluster helps by parallelizing the initial reading of every file.
import dask_jobqueue
cluster = dask_jobqueue.PBSCluster(
cores=4, # The number of cores you want
memory="23GB", # Amount of memory
processes=1, # How many processes
queue="casper", # The type of queue to utilize (/glade/u/apps/dav/opt/usr/bin/execcasper)
local_directory="/local_scratch/pbs.$PBS_JOBID/dask/spill",
log_directory="/glade/scratch/dcherian/dask/",
resource_spec="select=1:ncpus=4:mem=23GB", # Specify resources
project="ncgd0011", # Input your project ID here
walltime="02:00:00", # Amount of wall time
interface="ib0", # Interface to use
)
# create an adaptive cluster with one job always requested,
# scale to a maximum of 6 jobs
# and hold on to each job for 600 seconds of idle time
cluster.adapt(minimum_jobs=1, maximum_jobs=6, wait_count=600)
import distributed
client = distributed.Client(cluster)
client
/glade/u/home/dcherian/miniconda3/envs/pump/lib/python3.10/site-packages/dask_jobqueue/core.py:20: FutureWarning: tmpfile is deprecated and will be removed in a future release. Please use dask.utils.tmpfile instead.
from distributed.utils import tmpfile
/glade/u/home/dcherian/miniconda3/envs/pump/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 45848 instead
warnings.warn(
Client
Client-6ae07e09-3c6e-11ee-81c8-3cecef1b11f8
Connection method: Cluster object | Cluster type: dask_jobqueue.PBSCluster |
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/dcherian/casper/proxy/45848/status |
Cluster Info
PBSCluster
f59e9f61
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/dcherian/casper/proxy/45848/status | Workers: 0 |
Total threads: 0 | Total memory: 0 B |
Scheduler Info
Scheduler
Scheduler-7028ad57-67ed-4993-a974-e3a9b95111fb
Comm: tcp://10.12.206.35:45783 | Workers: 0 |
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/dcherian/casper/proxy/45848/status | Total threads: 0 |
Started: Just now | Total memory: 0 B |
Workers
import distributed
client = distributed.Client(cluster)
client
Client
Client-6ae5eeca-3c6e-11ee-81c8-3cecef1b11f8
Connection method: Cluster object | Cluster type: dask_jobqueue.PBSCluster |
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/dcherian/casper/proxy/45848/status |
Cluster Info
PBSCluster
f59e9f61
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/dcherian/casper/proxy/45848/status | Workers: 0 |
Total threads: 0 | Total memory: 0 B |
Scheduler Info
Scheduler
Scheduler-7028ad57-67ed-4993-a974-e3a9b95111fb
Comm: tcp://10.12.206.35:45783 | Workers: 0 |
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/dcherian/casper/proxy/45848/status | Total threads: 0 |
Started: Just now | Total memory: 0 B |
Workers
Read#
Now we can scale it up.
We generalize a little by having add_ensemble_dim
expand the dimensions of any variable with 3 or more dimensions.
def add_ensemble_dim(ds):
# find all 3D variables
names = [name for name, variable in ds.variables.items() if variable.ndim >= 3]
# add a new dimension `ensemble` of size 1
# and replace the existing 3D variables.
ds = ds.update(ds[names].expand_dims("ensemble"))
return ds
combined = xr.open_mfdataset(
# make sure we sort
sorted(files),
# chunk each individual file
chunks={"time": 365 * 5},
# Add the ensemble dimension to 3D variables
preprocess=add_ensemble_dim,
# concatenate along a new dimension called "ensemble"
concat_dim="ensemble",
# only concatenate variables with the `ensemble` dimension.
data_vars="minimal",
coords="minimal",
compat="override",
combine="nested",
# parallelize reading of each file using dask
parallel=True,
)
combined
<xarray.Dataset> Dimensions: (time: 8761, bnds: 2, lon: 288, lat: 192, ensemble: 50) Coordinates: * time (time) object 2000-01-01 00:00:00 ... 2023-12-31 00:00:00 * lon (lon) float64 0.0 1.25 2.5 3.75 5.0 ... 355.0 356.2 357.5 358.8 * lat (lat) float64 -90.0 -89.06 -88.12 -87.17 ... 88.12 89.06 90.0 Dimensions without coordinates: bnds, ensemble Data variables: time_bnds (time, bnds) object dask.array<chunksize=(1825, 2), meta=np.ndarray> PRECT (ensemble, time, lat, lon) float32 dask.array<chunksize=(1, 1825, 192, 288), meta=np.ndarray> Attributes: (12/13) CDI: Climate Data Interface version 2.0.2 (https://mpimet.m... Conventions: CF-1.0 source: CAM case: b.e21.BHISTsmbb.f09_g17.LE2-1011.001 logname: sunseon host: mom2 ... ... topography_file: /mnt/lustre/share/CESM/cesm_input/atm/cam/topo/fv_0.9x... model_doi_url: https://doi.org/10.5065/D67H1H0V time_period_freq: day_1 history: Wed May 17 11:17:29 2023: cdo selvar,PRECT tmp.nc PREC... NCO: netCDF Operators version 5.0.3 (Homepage = http://nco.... CDO: Climate Data Operators version 2.0.1 (https://mpimet.m...
Note that on-disk chunking matters#
Running
ncdump -sh /glade/scratch/anukesh/CESM-LE/PRECT/ENSEMBLE/b.e21.BHISTsmbb.f09_g17.LE2-1251.011.cam.h1.PRECT.18500101-21001231.nc
shows
float PRECT(time, lat, lon) ;
PRECT:units = "m/s" ;
PRECT:long_name = "Total (convective and large-scale) precipitation rate (liq + ice)" ;
PRECT:cell_methods = "time: mean" ;
PRECT:_Storage = "chunked" ;
PRECT:_ChunkSizes = 1, 192, 288 ;
PRECT:_DeflateLevel = 1 ;
PRECT:_Shuffle = "true" ;
PRECT:_Endianness = "little" ;
This bit is important: PRECT:_ChunkSizes = 1, 192, 288 ;
The data on-disk is chunked to have a chunksize of 1 along time, and all spatial points in one chunk. This is orthogonal to our proposed chunking scheme of chunking small in space and big in time (chunks={"lat": 16, "lon": 32}
).
Actually reading data with chunks={"lat": 16, "lon": 32}
will be quite slow, because we will end up reading the whole file to extract data for a single chunk.
Tip
See this Unidata blogpost on netCDF chunking for more.