Dask Chunking - Best Practices#
ESDS Dask tutorial | 06 February, 2023
Negin Sobhani, Brian Vanderwende, Deepak Cherian, Ben Kirk
Computational & Information Systems Lab (CISL)
negins@ucar.edu, vanderwb@ucar.edu
In this tutorial, you will learn:#
Basic rules of thumb for chunking
The importance of conforming to file chunks
The impact of rechunking in the computational pipeline
Related Documentation
Chunking Considerations#
Determining the best approach for sizing your Dask chunks can be tricky and often requires intuition about both Dask and your particular dataset. There are various considerations you may need to account for depending on your workflow:
The size (in bytes) of your chunks vs your number of workers
The chunk layout of data read from disk (formats like HDF5, Zarr)
The access patterns of your computational pipeline
Dask Array with NumPy array chunks…
Starting up our PBS Cluster#
To demonstrate the affects of different chunking strategies, let’s instantiate a PBSCluster
with 4 workers
import dask
from dask_jobqueue import PBSCluster
from dask.distributed import Client
# Create a PBS cluster object
cluster = PBSCluster(
job_name = 'dask-wk23-chunking',
cores = 1,
memory = '10GiB',
processes = 1,
local_directory = '/glade/scratch/vanderwb/temp/dask/spill/pbs.$PBS_JOBID',
resource_spec = 'select=1:ncpus=1:mem=10GB',
queue = 'casper',
walltime = '30:00',
interface = 'ext'
)
# Sanity-check our setup
print(cluster.job_script())
client = Client(cluster)
client
Chunk size - Load balancing vs. Overhead#
There is always an optimal chunk size given your hardware setup and computation problem that is neither too big nor too small. Finding this chunk size often requires some trial and error, but it is helpful to know what you are looking to avoid:
Too small - if your chunks are too small, you will end up spending a significant and wasteful amount of time waiting for Dask to perform overhead (scheduling tasks, data communication) relative to the time spent computing
Too large - you run the risk of spilling to dask or memory failures and the scheduler also has a more difficult time load balancing
The following rules of thumb are known, but it will vary according to your workflow:
Too Small |
Possibly Too Small |
Optimal |
Too Large |
---|---|---|---|
< 1 MB |
1-100 MB |
100 MB - 1 GB |
> Spill threshold |
In practice, using chunks close to 0.1-0.5 GB in size works well.
Let’s test these rules of thumb…#
# Spin up workers on our PBS cluster
cluster.scale(4)
client.wait_for_workers(4)
For this exercise, we will simply generate a random number Dask Array of sufficient size that it would not fit in our login session memory. Let’s try different chunking strategies.
import dask.array as da
t = da.random.random((60000, 72000), chunks = (30000,36000))
t
These chunks are too large. They will exceed our spill threshold (0.6-0.7) and even briefly exceed our pause limit (0.8). The only thing working in our favor in this configuration is that non-aggregation tasks should be well-balanced among the 4 workers with 4 chunks, and we have a short task graph.
task = t.mean()
task.dask
%%time
result = task.compute()
In this next configuration, we end up specifying a configuration with very small chunks relative to the problem size. We will not come close to the memory limits, but we will incur significant overhead relative to our computational task.
t = da.random.random((60000, 72000), chunks = (1000,1000))
t
task = t.mean()
task.dask
%%time
result = task.compute()
Next, we will choose chunk sizes that fall in our expected “optimal” range of 100 MiB - 1 GiB
. We should be allowing Dask to distribute work efficiently but not imposing a high overhead…
t = da.random.random((60000, 72000), chunks = (10000,6000))
t
%%time
result = t.mean().compute()
Matching chunking in a netCDF4 file#
If you are using a chunked data format, it is best to specify Dask chunks which equal to or (better-yet) multiples of the chunk shape on disk. If chunk sizes aren’t multiples of disk chunks, you risk unnecessary additional reads of data as multiple disk chunks will need to be read to populate each Dask chunk. This can be very inefficient!
Inspecting file chunking#
The exact process for checking file chunking depends on the format. Using the netCDF4 Python module, we can query the chunking parameters of any variable in a netCDF4 file.
Classic netCDF files do not support chunking!
import netCDF4 as nc
We will use a data file from a model forecast dataset over the Arctic:
my_file = '/glade/collections/rda/data/ds631.1/asr15.fcst3.3D/asr15km.fct.3D.20120916.nc'
Once we open the dataset (nc4 file), we can reference a variable of interest using a dictionary key and then get the dimensions of that variable using get_dims()
.
nc_data = nc.Dataset(my_file)
nc_data['CLDFRA'].get_dims()
We can then use the chunking()
method to get our chunk size for each dimension:
nc_data['CLDFRA'].chunking()
Specifying chunks using Xarray#
Now that we understand our file chunks, we can specify a preferred chunk size to open_dataset
. Note that if we use the chunks
parameter, any dimension we don’t mention will be spanned in its entirety for chunks.
import xarray as xr
# Open dataset using chunking along Time dimension
ds = xr.open_dataset(my_file, chunks = {'Time' : 1})
Since we are only specifying a chunk size for Time, this should be equivalent to the following chunk shape:
chunks = {'Time' : 1,
'num_metgrid_levels' : -1,
'south_north' : -1,
'west_east' : -1 }
We can confirm that our chunks look as intended using the DataArray repr:
ds.CLDFRA
Note: You can also retrieve the file chunk size from Xarray itself, but it is not shown in the above repr. Use the following DataArray (variable) attribute instead:
ds.CLDFRA.encoding["chunksizes"]
Now let’s benchmark various chunk configurations. Our initial guess achieves the recommended ratio of >= 2 chunks per worker, but does use multiples of the file chunk size except in the time dimension.
For this benchmark, we will find the maximum cloud fraction across vertical levels at all locations and times.
%%time
result = ds.CLDFRA.max(dim = "num_metgrid_levels").compute()
Notice above that this file has chunking that does not divide evenly into the dimension sizes. We can specify that our chunks match the file chunks directly, but this will leave “remainder” chunks and will slightly increase overhead.
ds = xr.open_dataset(my_file, chunks = {'Time' : 1, "num_metgrid_levels" : 16,
"south_north" : 355, "east_west" : 355})
ds.CLDFRA
%%time
result = ds.CLDFRA.max(dim = "num_metgrid_levels").compute()
The most problematic case occurs when we have chunk sizes that are smaller than the file chunks in one or more dimensions. Let’s evaluate the impact by using progressively smaller vertical level ranks:
# Using half the file chunk size in the vertical (same number of chunks)
ds = xr.open_dataset(my_file, chunks = {"Time" : 4, "num_metgrid_levels" : 8})
%%time
result = ds.CLDFRA.max(dim = "num_metgrid_levels").compute()
# Use 1/4 the chunk size in the vertical
ds = xr.open_dataset(my_file, chunks = {"Time" : 8, "num_metgrid_levels" : 4})
%%time
result = ds.CLDFRA.max(dim = "num_metgrid_levels").compute()
It is also possible to use “auto” chunking, whereby the DataArray chunks are calculated for you. Are these optimal?
# Open dataset using auto-chunking
ds = xr.open_dataset(my_file, chunks = 'auto')
ds.CLDFRA
%%time
result = ds.CLDFRA.max(dim = "num_metgrid_levels").compute()
No! Avoid using auto chunking for files written in chunks!
Rechunking is expensive#
There are various reasons Dask might need to rechunk data, but in any case, it can be an expensive operation with a large amount of communication required between workers.
Scenario: We wish to get the mean difference between two versions of a model for the same case study. Unfortunately, while the grids match for each version, the file chunk size used was different.
Here, we will emulate the scenario with Dask Arrays…
old_run = da.random.random((800,600,60,20), chunks = (400,300,30,1))
old_run
new_run = da.random.random((800,600,60,20), chunks = (800,600,10,1))
new_run
Let’s set up and analyse (via a high-level task graph), the operations we will need to do to retrieve a mean-squared difference/error between our two datasets.
# Calculate the mean squared difference
mse_graph = ((old_run - new_run) ** 2).sum() / old_run.size
mse_graph.dask
Note the two rechunking operations near the beginning of our task graph. Because our data arrays are chunked differently, Dask must rechunk first to avoid slowing down operations with large data transfers between workers. It is good that Dask does this, but rechunking is still expensive…
%%time
mse_graph.compute()
In most circumstances, we will want to rechunk this data ourselves manually, and then save state (probably by creating a new rechunked data file). This one-time cost means we will not need to rechunk again in the future.
In our scenario, we would likely rechunk the old run data, since we expect all future runs will have the new chunking.
old_run_rechunked = old_run.rechunk((800,600,10,1))
Once this is done in a conversion workflow, we could load the rechunked data in our current workflow.
old_run = da.random.random((800,600,60,20), chunks = (800,600,10,1))
# Calculate the mean squared difference
mse_graph = ((old_run - new_run) ** 2).sum() / old_run.size
mse_graph.dask
%%time
mse_graph.compute()
client.shutdown()
Takeaway Message#
Chunking is fundamental to Dask and its blocked-algorithm approach, so don’t ignore intelligently sizing your data chunks. Finding the perfect chunk size is not the goal, but neglecting simple rules of thumb can lead to massive performance penalties when aggregated over a complex multipart analysis.