Dask on HPC - Starting Clusters, Monitoring, and Debugging#
ESDS Dask tutorial | 06 February, 2023
Negin Sobhani, Brian Vanderwende, Deepak Cherian, Ben Kirk
Computational & Information Systems Lab (CISL)
negins@ucar.edu, vanderwb@ucar.edu
In this tutorial, you will learn:#
How to configure and initialize an HPC Dask cluster via
dask-jobqueue
How to manage and monitor the resource usage of your Dask workers
Understanding Dask worker logs
Controlling how and where data spills from memory to disk
Analyzing the impact of your Dask workflow on your allocation
Related Documentation
Starting HPC Dask clusters with dask-jobqueue
#
A defining feature of most HPC systems is the batch scheduler - Slurm, PBS, LSF, etc… These schedulers allow us to access the significant resources of the system and scale far beyond what is capable by a personal workstation.
Using Dask on an HPC system is no different - we need to interact with the scheduler to provide Dask with ample compute resources. We could first start a job with multiple cores and a large amount of memory, and then use the LocalCluster to spawn workers. However, this approach only scales to a single node.
The typical approach is to let Dask request resources directly from the job scheduler via a scheduler-specific cluster type. Such clusters are provided by the add-on dask-jobqueue
package.
Creating a scheduled-cluster#
Since we use the PBS Pro scheduler at NCAR, we will use the PBSCluster Dask scheduler from dask-jobqueue
. Initialization is similar to a LocalCluster, but with unique parameters specific to creating batch jobs.
import dask
from dask_jobqueue import PBSCluster
from dask.distributed import Client
The parameters of the PBSCluster
provide a basic template for the resources that will be assigned to each job…
# Create a PBS cluster object
cluster = PBSCluster(
job_name = 'dask-wk23-hpc',
cores = 1,
memory = '4GiB',
processes = 1,
local_directory = '/local_scratch/pbs.$PBS_JOBID/dask/spill',
resource_spec = 'select=1:ncpus=1:mem=4GB',
queue = 'casper',
walltime = '30:00',
interface = 'ext'
)
Since we are working on a shared system, you may get a port-in-use warning. This is no cause for alarm, but make sure you are not starting a duplicate cluster unintentionally.
We should pause and consider some of these settings…
The
cores
andmemory
parameters are used by Dask to define workers, while theresource_spec
is used by PBS to define jobs. In this single-worker config, they should match!PBS uses GB to mean 1024-based storage units.
dask-jobqueue
accurately calls theseGiB
.We use
interface='ext'
to instruct Dask to use TCP over the high-speed ethernet instead of other, slower, ethernet devices.
Note also that we are using one worker per PBS job. This is a reasonable default on Casper, but it is possible to group workers together on one or more PBS jobs as well by increasing the cores
and ncpus
. Here are some considerations:
Using less workers per job will:
Increase job throughput on most systems (easier to backfill smaller jobs)
Will always avoid interpretor lock issues
Is conceptually easy to understand
May be more robust if system is unstable
Can speed up file-reads in some situations
Using more workers per job will:
Less overhead in thread-friendly workflows
May allow for slightly higher memory thresholds since they will share a pool
It is good practice to check your validate your cluster before initiating any workers, by outputting the job script Dask will create#
print(cluster.job_script())
Note how some settings are showing up despite me not setting them… where does my account come from, for example?
Let’s take a detour for a moment…
Dask configuration files#
We can customize the behavior of Dask using YAML-based configuration files. These have some advantages:
Eliminate user-specific configuration from your notebooks/scripts
Avoid repetition in defining clusters and other Dask objects
Potentially reduce errors from forgetting important settings
And also some downsides:
Obfuscates settings from others (including your future self!)
Reduces portability and ease of debugging
User configuration files are stored in ~/.config/dask
by default. System administrators may also provide default Dask configuration in /etc/dask
or via the DASK_ROOT_CONFIG
environment variable.
!ls ~/.config/dask
# Programmatically view configuration file(s) within Python
from dask import config
config.refresh()
config.get('jobqueue.pbs')
Live Performance Monitoring#
Using dask.distributed
provides us with a powerful diagnostic tool you have already seen: the Dashboard. The Dashboard can be integrated into your Jupyter environment in two ways - either with a separate website accessible from the Client widget, or as tabs in your JupyterLab interface via the dask-labextension
add-on.
Tip: JupyterLab Dashboard tabs can be saved as a “workspace” and loaded back in future sessions.
Let’s see how both can be used to monitor Dask workers.
# Create the client to load the Dashboard
client = Client(cluster)
# Display the client repr
client
The Dashboard is immediately accessible above when using NCAR’s JupyterHub. This URL can also be entered into the Dashboard extension (click the Dask logo on the left toolbar), which allows you to add useful screens like Task Stream
and Workers Memory
to your Lab interface.
# Scale the cluster to 2 workers (which will use 2 jobs here)
cluster.scale(2)
# Block progress until workers have spawned (typically only in demos and benchmarks!)
client.wait_for_workers(2)
# See the workers from the cluster object
cluster.workers
# See the workers in the job scheduler
!qstat -u $USER
As soon as we scale the cluster up, the clock is ticking on these PBS jobs. Be mindful of idle workers when using a batch scheduler!
Dashboard demo: multi-file Xarray data analysis#
To demonstrate how the Dashboard can be useful, let’s do some simple analysis of data files using Xarray. Here we load 19 days of GOES5 data, and compute the mean near-surface temperature across the western US.
import xarray as xr
# Use a multi-file import and load data in parallel
ds = xr.open_mfdataset("/glade/collections/rda/data/ds313.0/orig_res/2022/GEOS5_orig_res_202201[0-1]*.nc", parallel = True)
# Show the total size of the variable (this has not been read in yet!)
print("Size of Variable = {:5.2f} GiB".format(ds.T.nbytes / 1024 ** 3))
This data is much too big for our worker template, but as we have seen the chunks will be smaller in size. We can see if they will fit in RAM or cause spill by querying the data array
# The graphical repr of one DaskArray - T
ds.T
It looks like our data chunks will fit into RAM, but we can verify using the Dashboard. Let’s construct our computation. Here we do the following:
Subset the “western US” from the data via lat/lon slices
Take the mean of temperature values across our western US box
Select the near-surface level (0)
Remember, we are just creating the task graph here. No work will occur yet in our cluster.
# Create our task graph
sfc_mean_graph = ds.T.sel(lon = slice(235, 255), lat = slice(30,50)).mean(["lat","lon"]).isel(lev = 0)
# Tip - double-click the figure to see the actual size
dask.visualize(sfc_mean_graph)
Now, we can use .compute()
to start the computation on our cluster. Keep an eye on the dashboard plots to follow progress.
%%time
result = sfc_mean_graph.compute()
result.plot()
Now, let’s see what speedup we can get by manually scaling up our computation by 2x. This is not possible (beyond a certain hardware limit) on a LocalCluster
, but is easy to do using dask-jobqueue
!
# Scale the cluster to 4 workers
cluster.scale(4)
client.wait_for_workers(4)
# How does this look in PBS?
!qstat -u vanderwb
One downside of scaling is that you end up with a worker pool that has different amounts of wallclock time remaining. The flow of your script across time is something to consider - if you need to spin up more workers after some collection of them has used significant walltime, it may make sense to first scale down your cluster to zero (or run client.restart()
) and then instantiate the new workers.
Here, we will also demonstrate another type of performance monitoring provided by dask.distributed
- the performance report. Using a context manager, we can profile the computational components in the task stream and store it as an HTML file for future analysis. It provides a hard copy analysis of the computation, though unfortunately it does not record worker memory usage.
# Let's try generating a "performance report" this time
from dask.distributed import performance_report
%%time
# Since metrics are captured live anyway, the overhead from the report is small
with performance_report(filename="dask-report.html"):
result = sfc_mean_graph.compute()
Hopefully, we see a significant improvement in time-to-solution using 2x the workers.
Another improvement we can make to our computation is to reduce the size of the problem down as much as possible before doing meaningful work. Let’s try rearranging our graph:
# Create our improved task graph
sfc_mean_graph = ds.T.sel(lon = slice(235, 255), lat = slice(30,50)).isel(lev = 0).mean(["lat","lon"])
%%time
result = sfc_mean_graph.compute()
Since flattening the level dimension will reduce data compared to the lat-lon box, let’s do that first to make subsequent operations cheaper.
Indexing |
Dimensions |
Pts Eliminated |
---|---|---|
isel |
721 x 1152 x 71 |
58,972,032 |
sel |
(721 - 81) x (1152 - 81) x 72 |
49,351,680 |
# Can we do better?
sfc_mean_graph = ds.T.isel(lev = 0).sel(lon = slice(235, 255), lat = slice(30,50)).mean(["lat","lon"])
%%time
result = sfc_mean_graph.compute()
In this case, optimizations to the base operation can yield better speed improvements than doubling the dask worker count. Optimize your workflow first, if possible - then parallelize with Dask if still necessary.
Let’s plot our results again to inspect for differences.
result.plot()
Aside: Clusters can also adaptively scale#
For interactive, exploratory work, adaptive scaling can be useful (also very useful on cloud platforms). This allows the cluster to dynamically scale up and down based on the (Dask) scheduler’s estimation of resource needs. This capability is highly customizable, but one basic method would be to set bounds on the number of worker jobs that can be used:
cluster.adapt(minimum=0, maximum=12)
Another benefit of adaptive scaling is that you can use the worker --lifetime
argument to tell Dask to cleanly end work on a worker and restart the PBS job. If you stagger the start of your workers, Dask will be able to shuffle tasks appropriately to produce a so-called infinite workload.
On busy systems, adaptive scaling can slow down bursty computations because of queue waits between scale-down and scale-up cycles.
Optimization: Persisting data in worker memory#
Sometimes you will need to compute multiple parameters on data from Dask objects. Using .persist()
to store intermediate data in worker memory can save computational time if used appropriately. The raw data can be persisted too, of course, but watch out for exhausting worker memory.
Here we compare the time it takes - with and without persisting intermediate results - to compute our level-0 mean, a level-10 mean, and a mean across all model levels.
We will also introduce another diagnostic tool here, the MemorySampler
context manager.
from distributed.diagnostics import MemorySampler
ms = MemorySampler()
%%time
# Without persistance
with ms.sample("Original"):
r1 = ds.T.isel(lev = 0).sel(lon = slice(235, 255), lat = slice(30,50)).mean(["lat","lon"]).compute()
r2 = ds.T.isel(lev = 10).sel(lon = slice(235, 255), lat = slice(30,50)).mean(["lat","lon"]).compute()
ra = ds.T.sel(lon = slice(235, 255), lat = slice(30,50)).mean(["lev","lat","lon"]).compute()
%%time
# With persistance
with ms.sample("Persist"):
T_means = ds.T.sel(lon = slice(235, 255), lat = slice(30,50)).mean(["lat","lon"]).persist()
r1 = T_means.isel(lev = 0).compute()
r2 = T_means.isel(lev = 10).compute()
ra = T_means.mean("lev").compute()
Without persisting the intermediate results, Dask will only store r1 and r2 in worker memory, and so the indexing operations must be done from scratch each time.
Let’s look at the memory usage…
ms.plot(align=True)
Because the intermediate results are required for the second calculation in the original case, we do not even use more memory. A clear win!
Of course, when persisting data it is extra important to clean up. Running del
on your persisted client variable will clear those data from worker memory (as long as they are not referenced by other variables). The progress dashboard is a useful reminder that we have data persisted.
del T_means
# Close current workers
cluster.scale(0)
Debugging workers case study: memory and spill-to-disk#
In this section, we will demonstrate two common considerations when using Dask on HPC:
Dask data spilling to disk
Interacting with
dask.distributed
worker logs
For this case study, we will generate progressively larger Dask arrays that eventually trigger memory conditions. Dask workers handle data in different ways in the following memory regimes:
Suggested Threshold |
Case Study Value |
Worker Behavior |
---|---|---|
0.6 (managed mem) |
2.4 GB |
Data is allocated on disk (spill) |
0.7 (process mem) |
2.8 GB |
Data is allocated on disk (spill) |
0.8 |
3.2 GB |
New data allocation is paused |
0.95 |
3.8 GB |
Worker is killed to avoid OOM |
These thresholds can be set at cluster creation time or overridden by your Dask Distributed configuration file.
import dask.array as da
from distributed.worker import logger
It is possible to write directly to worker logs (PBS job logs in our case) using the worker logger
from Dask Distributed. Here, we define a function to call the logger on each worker, which we will run eagerly via client.run
.
Keep an eye on the worker memory Dashboard panel as our for loop proceeds…
def log_message(chunk_size):
logger.info("Current chunk size = {} MiB".format(chunk_size))
# Start up 4 new workers
cluster.scale(4)
client.wait_for_workers(4)
for chunk_mib in [1600, 2400, 3200, 3900]:
client.run(log_message, chunk_mib)
chunk_size = chunk_mib / 8 * 1024 * 1024
print("Mean of {} MiB random array = {:0.2f}".format(chunk_mib, da.random.random((chunk_size * 4), chunks=(chunk_size)).mean().compute()))
# List the most recent 4 worker logs - these should be our logs
!ls -lrt dask-worker-logs | tail -n 4
We can open the log file in the Lab interface or a terminal and investigate the reason for the KilledWorker
exception.
# Let's look at the worker state in PBS after the failure
!qstat -u vanderwb
Notice how the workers retried the final computation a few times before giving up. This behavior occurs when using the nanny
, which attempts to restore workers when they are killed for various exceptions. (the nanny is an additional lightweight process that monitors the health of the worker and enables functionality like full cluster restart)
Recall the aforementined memory thresholds - workers get killed by Dask before exceeding the absolute limit of a job (which if hit on certain systems, could kill the PBS job too). Because of this safety mechanism, our PBS jobs are still intact.
If you are running a long computation that, if restarted, could exhaust the worker job’s walltime, you can disable the nanny functionality and make exceptions fatal.
# Shut down our client (thus terminating workers)
client.shutdown()
Analyzing your allocation#
Dask does not provide integrated tools for analyzing the impact to your allocation (though you could back out values with care). This job is best suited for the scheduler itself, assuming that you’ve carefully instantiated your workers.
Consider this value from our cluster config:
cluster.job_name
This means that every Dask worker I start via this workflow’s PBSCluster
will have the job name dask-wk23-hpc. We can leverage this along with our qhist
utility to query the logs:
qhist -u $USER -N dask-wk23-hpc -f numcpus,elapsed -c > qhist-dask.out
This command will query all dask-workers in today’s scheduler logs and output to CSV, which we redirect to a file. The name
field is a powerful tool. If instead you use a worker name specific to your script, you can easily query only the jobs from that script. Confusingly, if you wish to set the job name when creating your cluster object, use the job_name
parameter, not name
!
We can then read in the CSV using pandas (or even using a Dask Dataframe!).
import pandas as pd
dj = pd.read_csv("../data/qhist-dask.out")
dj.head()
print("Core-hours used by this notebook: {:.2f}".format(sum(dj['NCPUs'] * dj['Elapsed (h)'])))
Records from qhist
span the time that PBS was used on each system, so with a bit of forward-thinking prep work (picking descriptive worker names), you can easily trace back usage.
Additional Considerations#
As we’ve shown, Dask is flexible and highly configurable. While we will not cover the following topics in depth, we encourage you to explore further on your own (and let us know in the survey if you would like learn more!).
Dask Worker Count#
You may be wondering how to choose the number of workers. This question is tricky and can often depend on the state of the machine at any time. Here are some absolutes:
Use more than a single worker unless debugging or profiling
Do not use more workers than you have chunks - they will be idle
And here are some guidelines:
If you have to choose between more workers vs. more memory per worker, let the chunk size be your guide (more on this in the next notebook)
In general, requesting less workers with more memory will take longer to get through the queue than more workers with less memory; typically memory is more constrained than CPU cores on analysis machines
Using adaptive scaling will make your workflow throughput less senstive to the state of the HPC jobs queue
Using Dask on GPUs#
Much like Xarray can use NumPy arrays or Dask arrays, Dask itself can use NumPy arrays or CuPy arrays - the latter of which are GPU enabled on both NVIDIA and AMD hardware. For NVIDIA users, the RAPIDS suite offers cuDF - a drop in replacement for pandas DataFrames which can also be used with Dask. And efforts are underway to effectively use GPUs with Xarray and Dask.
For a starting point, check out the Dask with GPUs tutorial CISL offered in Summer 2022.