Writing multiple netCDF files in parallel with xarray and dask#

A typical computation workflow with xarray consists of:

  1. reading one or more netCDF files into an xarray dataset backed by dask using xr.open_mfdataset() or xr.open_dataset(chunks=...),

  2. applying some transformation to the input dataset, and

  3. saving the resulting output to disk in a netCDF file using xr.to_netcdf().

The last step (3) can easily lead to a large netCDF file (>=10GB in size). As a result, this step can take a very long time to complete (since it is run in serial), and sometimes may hang. So, to avoid these issues one can use one of the lesser-used but helpful xarray capabilities: the xr.save_mfdataset() function. This function allows users to write multiple datasets to disk as netCDF files simultaneously. The xr.save_mfdataset() function signature looks like this:

xr.save_mfdataset(
    datasets,
    paths,
    mode='w',
    format=None,
    groups=None,
    engine=None,
    compute=True,
)
Docstring:
Write multiple datasets to disk as netCDF files simultaneously.

Please show me the code#

Package imports#

import numpy as np
import xarray as xr
from distributed import Client, performance_report

xr.__version__
'0.15.1'
client = Client()
client

Client

Cluster

  • Workers: 4
  • Cores: 8
  • Memory: 17.18 GB

Load some toy dataset#

ds = xr.tutorial.open_dataset('rasm', chunks={'time': 12})
ds
Show/Hide data repr Show/Hide attributes
xarray.Dataset
    • time: 36
    • x: 275
    • y: 205
    • time
      (time)
      object
      1980-09-16 12:00:00 ... 1983-08-17 00:00:00
      long_name :
      time
      type_preferred :
      int
      array([cftime.DatetimeNoLeap(1980-09-16 12:00:00),
             cftime.DatetimeNoLeap(1980-10-17 00:00:00),
             cftime.DatetimeNoLeap(1980-11-16 12:00:00),
             cftime.DatetimeNoLeap(1980-12-17 00:00:00),
             cftime.DatetimeNoLeap(1981-01-17 00:00:00),
             cftime.DatetimeNoLeap(1981-02-15 12:00:00),
             cftime.DatetimeNoLeap(1981-03-17 00:00:00),
             cftime.DatetimeNoLeap(1981-04-16 12:00:00),
             cftime.DatetimeNoLeap(1981-05-17 00:00:00),
             cftime.DatetimeNoLeap(1981-06-16 12:00:00),
             cftime.DatetimeNoLeap(1981-07-17 00:00:00),
             cftime.DatetimeNoLeap(1981-08-17 00:00:00),
             cftime.DatetimeNoLeap(1981-09-16 12:00:00),
             cftime.DatetimeNoLeap(1981-10-17 00:00:00),
             cftime.DatetimeNoLeap(1981-11-16 12:00:00),
             cftime.DatetimeNoLeap(1981-12-17 00:00:00),
             cftime.DatetimeNoLeap(1982-01-17 00:00:00),
             cftime.DatetimeNoLeap(1982-02-15 12:00:00),
             cftime.DatetimeNoLeap(1982-03-17 00:00:00),
             cftime.DatetimeNoLeap(1982-04-16 12:00:00),
             cftime.DatetimeNoLeap(1982-05-17 00:00:00),
             cftime.DatetimeNoLeap(1982-06-16 12:00:00),
             cftime.DatetimeNoLeap(1982-07-17 00:00:00),
             cftime.DatetimeNoLeap(1982-08-17 00:00:00),
             cftime.DatetimeNoLeap(1982-09-16 12:00:00),
             cftime.DatetimeNoLeap(1982-10-17 00:00:00),
             cftime.DatetimeNoLeap(1982-11-16 12:00:00),
             cftime.DatetimeNoLeap(1982-12-17 00:00:00),
             cftime.DatetimeNoLeap(1983-01-17 00:00:00),
             cftime.DatetimeNoLeap(1983-02-15 12:00:00),
             cftime.DatetimeNoLeap(1983-03-17 00:00:00),
             cftime.DatetimeNoLeap(1983-04-16 12:00:00),
             cftime.DatetimeNoLeap(1983-05-17 00:00:00),
             cftime.DatetimeNoLeap(1983-06-16 12:00:00),
             cftime.DatetimeNoLeap(1983-07-17 00:00:00),
             cftime.DatetimeNoLeap(1983-08-17 00:00:00)], dtype=object)
    • xc
      (y, x)
      float64
      dask.array<chunksize=(205, 275), meta=np.ndarray>
      long_name :
      longitude of grid cell center
      units :
      degrees_east
      bounds :
      xv
      Array Chunk
      Bytes 451.00 kB 451.00 kB
      Shape (205, 275) (205, 275)
      Count 2 Tasks 1 Chunks
      Type float64 numpy.ndarray
      275 205
    • yc
      (y, x)
      float64
      dask.array<chunksize=(205, 275), meta=np.ndarray>
      long_name :
      latitude of grid cell center
      units :
      degrees_north
      bounds :
      yv
      Array Chunk
      Bytes 451.00 kB 451.00 kB
      Shape (205, 275) (205, 275)
      Count 2 Tasks 1 Chunks
      Type float64 numpy.ndarray
      275 205
    • Tair
      (time, y, x)
      float64
      dask.array<chunksize=(12, 205, 275), meta=np.ndarray>
      units :
      C
      long_name :
      Surface air temperature
      type_preferred :
      double
      time_rep :
      instantaneous
      Array Chunk
      Bytes 16.24 MB 5.41 MB
      Shape (36, 205, 275) (12, 205, 275)
      Count 4 Tasks 3 Chunks
      Type float64 numpy.ndarray
      275 205 36
  • title :
    /workspace/jhamman/processed/R1002RBRxaaa01a/lnd/temp/R1002RBRxaaa01a.vic.ha.1979-09-01.nc
    institution :
    U.W.
    source :
    RACM R1002RBRxaaa01a
    output_frequency :
    daily
    output_mode :
    averaged
    convention :
    CF-1.4
    references :
    Based on the initial model of Liang et al., 1994, JGR, 99, 14,415- 14,429.
    comment :
    Output from the Variable Infiltration Capacity (VIC) model.
    nco_openmp_thread_number :
    1
    NCO :
    "4.6.0"
    history :
    Tue Dec 27 14:15:22 2016: ncatted -a dimensions,,d,, rasm.nc rasm.nc Tue Dec 27 13:38:40 2016: ncks -3 rasm.nc rasm.nc history deleted for brevity
ds.Tair.data.visualize()
../../../_images/68a7c16c9445f3a38f0c52d82c39b90ee5c24aa7b18fdc82136c224079f446da.png

Perform a computation on input dataset#

As you can see, our input dataset is chunked along the time dimension. This produces an xarray dataset with 3 chunks. For illustrative purposes, let’s apply some arbitray computation on our dataset:

result = np.sqrt(np.sin(ds) ** 2 + np.cos(ds) ** 2)
result.Tair.data.visualize()
../../../_images/f5a60ac8380119ed28b5ca31d391f5c4182ad860943fd7a4903a69ce361eefa7.png

Our resulting xarray dataset has the same dimensions and the same number of chunks as our input dataset.

Create a helper function to split a dataset into sub-datasets#

Our main objective is to save this resulting dataset into multiple netCDF files by saving each chunk in its own netCDF file. To accomplish this, we need to create a function that allows us to split a dataset into sub-datasets for each chunk. The following code snippet was adapted from this comment by Stephan Hoyer.

import itertools


def split_by_chunks(dataset):
    chunk_slices = {}
    for dim, chunks in dataset.chunks.items():
        slices = []
        start = 0
        for chunk in chunks:
            if start >= dataset.sizes[dim]:
                break
            stop = start + chunk
            slices.append(slice(start, stop))
            start = stop
        chunk_slices[dim] = slices
    for slices in itertools.product(*chunk_slices.values()):
        selection = dict(zip(chunk_slices.keys(), slices))
        yield dataset[selection]
datasets = list(split_by_chunks(result))

Let’s confirm that we have three items corresponding to our three chunks:

print(len(datasets))
3

Each item in the returned list consists of an xarray dataset corresponding to each sliced chunk. Now that we have a list of datasets, we need another helper function that will generate a filepath for us when given an xarray dataset as input:

Create a helper function for generating a filepath#

def create_filepath(ds, prefix='filename', root_path="."):
    """
    Generate a filepath when given an xarray dataset
    """
    start = ds.time.data[0].strftime("%Y-%m-%d")
    end = ds.time.data[-1].strftime("%Y-%m-%d")
    filepath = f'{root_path}/{prefix}_{start}_{end}.nc'
    return filepath

Let’s run a small test to make sure this function is working as expected:

create_filepath(datasets[1])
'./filename_1981-09-16_1982-08-17.nc'

Let’s create a list of paths to which to save each corresponding dataset.

paths = [create_filepath(ds) for ds in datasets]
paths
['./filename_1980-09-16_1981-08-17.nc',
 './filename_1981-09-16_1982-08-17.nc',
 './filename_1982-09-16_1983-08-17.nc']

Invoke xr.save_mfdataset()#

At this point we have two lists:

  1. datasets: List of datasets to save.

  2. paths: List of paths to which to save each corresponding dataset.

We are ready to invoke the xr.save_mfdataset():

xr.save_mfdataset(datasets=datasets, paths=paths)

Confirm that the output files were properly written#

new_ds = xr.open_mfdataset(paths, combine='by_coords')
new_ds
Show/Hide data repr Show/Hide attributes
xarray.Dataset
    • time: 36
    • x: 275
    • y: 205
    • xc
      (y, x)
      float64
      dask.array<chunksize=(205, 275), meta=np.ndarray>
      long_name :
      longitude of grid cell center
      units :
      degrees_east
      bounds :
      xv
      Array Chunk
      Bytes 451.00 kB 451.00 kB
      Shape (205, 275) (205, 275)
      Count 10 Tasks 1 Chunks
      Type float64 numpy.ndarray
      275 205
    • yc
      (y, x)
      float64
      dask.array<chunksize=(205, 275), meta=np.ndarray>
      long_name :
      latitude of grid cell center
      units :
      degrees_north
      bounds :
      yv
      Array Chunk
      Bytes 451.00 kB 451.00 kB
      Shape (205, 275) (205, 275)
      Count 10 Tasks 1 Chunks
      Type float64 numpy.ndarray
      275 205
    • time
      (time)
      object
      1980-09-16 12:00:00 ... 1983-08-17 00:00:00
      long_name :
      time
      type_preferred :
      int
      array([cftime.DatetimeNoLeap(1980-09-16 12:00:00),
             cftime.DatetimeNoLeap(1980-10-17 00:00:00),
             cftime.DatetimeNoLeap(1980-11-16 12:00:00),
             cftime.DatetimeNoLeap(1980-12-17 00:00:00),
             cftime.DatetimeNoLeap(1981-01-17 00:00:00),
             cftime.DatetimeNoLeap(1981-02-15 12:00:00),
             cftime.DatetimeNoLeap(1981-03-17 00:00:00),
             cftime.DatetimeNoLeap(1981-04-16 12:00:00),
             cftime.DatetimeNoLeap(1981-05-17 00:00:00),
             cftime.DatetimeNoLeap(1981-06-16 12:00:00),
             cftime.DatetimeNoLeap(1981-07-17 00:00:00),
             cftime.DatetimeNoLeap(1981-08-17 00:00:00),
             cftime.DatetimeNoLeap(1981-09-16 12:00:00),
             cftime.DatetimeNoLeap(1981-10-17 00:00:00),
             cftime.DatetimeNoLeap(1981-11-16 12:00:00),
             cftime.DatetimeNoLeap(1981-12-17 00:00:00),
             cftime.DatetimeNoLeap(1982-01-17 00:00:00),
             cftime.DatetimeNoLeap(1982-02-15 12:00:00),
             cftime.DatetimeNoLeap(1982-03-17 00:00:00),
             cftime.DatetimeNoLeap(1982-04-16 12:00:00),
             cftime.DatetimeNoLeap(1982-05-17 00:00:00),
             cftime.DatetimeNoLeap(1982-06-16 12:00:00),
             cftime.DatetimeNoLeap(1982-07-17 00:00:00),
             cftime.DatetimeNoLeap(1982-08-17 00:00:00),
             cftime.DatetimeNoLeap(1982-09-16 12:00:00),
             cftime.DatetimeNoLeap(1982-10-17 00:00:00),
             cftime.DatetimeNoLeap(1982-11-16 12:00:00),
             cftime.DatetimeNoLeap(1982-12-17 00:00:00),
             cftime.DatetimeNoLeap(1983-01-17 00:00:00),
             cftime.DatetimeNoLeap(1983-02-15 12:00:00),
             cftime.DatetimeNoLeap(1983-03-17 00:00:00),
             cftime.DatetimeNoLeap(1983-04-16 12:00:00),
             cftime.DatetimeNoLeap(1983-05-17 00:00:00),
             cftime.DatetimeNoLeap(1983-06-16 12:00:00),
             cftime.DatetimeNoLeap(1983-07-17 00:00:00),
             cftime.DatetimeNoLeap(1983-08-17 00:00:00)], dtype=object)
    • Tair
      (time, y, x)
      float64
      dask.array<chunksize=(12, 205, 275), meta=np.ndarray>
      Array Chunk
      Bytes 16.24 MB 5.41 MB
      Shape (36, 205, 275) (12, 205, 275)
      Count 9 Tasks 3 Chunks
      Type float64 numpy.ndarray
      275 205 36
try:
    xr.testing.assert_identical(result, new_ds)
except AssertionError:
    print('The datasets are not identical!')
else:
    print('The datasets are identical!')
The datasets are identical!

There you have it 🎉🎉🎉🎉

Conclusion#

This article was inspired by a problem that my colleague was running into. I thought it would be useful to document this fix in a blog post. Many thanks to Deepak Cherian for pointing me to some relevant xarray issues and to the @NCAR/xdev team for their valuable and critical feedback on drafts of this article along the way.