NCAR logo

Dask Schedulers#

ESDS dask tutorial | 06 February, 2023

Negin Sobhani, Brian Vanderwende, Deepak Cherian, Ben Kirk
Computational & Information Systems Lab (CISL)
negins@ucar.edu, vanderwb@ucar.edu


In this tutorial, you learn:#

  • Components of Dask Schedulers

  • Types of Dask Schedulers

  • Single Machine Schedulers

Related Documentation

Introduction#

As we mentioned in our Dask overview, Dask is composed of two main parts:

  1. Dask Collections (APIs)

  2. Dynamic Task Scheduling

So far, we have talked about different Dask collections, but in this tutorial we are going to talk more about the second part.

The Dask scheduler - our task orchestrator#

The Dask.distributed task scheduler is a centralized, dynamic system that coordinates the efforts of various dask worker processes spread accross different machines.

When a computational task is submitted, the Dask distributed scheduler sends it off to a Dask cluster - simply a collection of Dask workers. A worker is typically a separate Python process on either the local host or a remote machine.

  • worker - a Python interpretor which will perform work on a subset of our dataset.

  • cluster - an object containing instructions for starting and talking to workers.

  • scheduler - sends tasks from our task graph to workers.

  • client - a local object that points to the scheduler (often local but not always).

Schedulers#

Dask essentially offers two types of schedulers:

1. Single machine scheduler#

  • The Single-machine Scheduler schedules tasks and manages the execution of those tasks on the same machine where the scheduler is running.

  • It is designed to be used in situations where the amount of data or the computational requirements are too large for a single process to handle, but not large enough to warrant the use of a cluster of machines.

  • It is relatively simple and cheap to use but it does not scale as it only runs on a single machine.

Single machine scheduler is the default choice used by Dask.

In Dask, there are several types of single machine schedulers that can be used to schedule computations on a single machine:

1.1. Single-threaded scheduler#

This scheduler runs all tasks serially on a single thread.
This is only useful for debugging and profiling, but does not have any parallelization.

1.2. Threaded scheduler#

The threaded scheduler uses a pool of local threads to execute tasks concurrently.
This is the default scheduler for Dask, and is suitable for most use cases on a single machine. Multithreading works well for Dask Array and Dask DataFrame.

To select one of the above scheduler for your computation, you can specify it when doing .compute():

For example:

this.compute(scheduler="single-threaded")  # for debugging and profiling only

As mentioned above the threaded scheduler is the default scheduler in Dask. But you can set the default scheduler to Single-threaded or multi-processing by:

import dask
dask.config.set(scheduler='synchronous')  # overwrite default with single-threaded scheduler

Multi-processing works well for pure Python code - delayed functions and operations on Dask Bags.

Let’s compare the performance of each of these single-machine schedulers:

Distributed Clusters#

import numpy as np
import dask.array as da
%%time
## - numpy performance
xn = np.random.normal(10, 0.1, size=(20_000, 20_000))
yn = xn.mean(axis=0)
yn
CPU times: user 14.9 s, sys: 1.32 s, total: 16.2 s
Wall time: 16.1 s
array([ 9.99987393,  9.99942047, 10.00069322, ...,  9.99997333,
        9.99945909, 10.00094973])
%%time
# -- dask array using the default
xd = da.random.normal(10, 0.1, size=(20_000, 20_000), chunks=(2000, 2000))
yd = xd.mean(axis=0)
yd.compute()
CPU times: user 14.8 s, sys: 112 ms, total: 14.9 s
Wall time: 3.83 s
array([ 9.99928454,  9.99968075, 10.00027327, ..., 10.00030439,
        9.9999113 ,  9.99947802])
import time
# -- dask testing different schedulers:
for sch in ['threading', 'processes', 'sync']:
    t0 = time.time()
    r = yd.compute(scheduler=sch)
    t1 = time.time()
    print(f"{sch:>10} :  {t1 - t0:0.4f} s")
 threading :  3.7886 s
 processes :  5.2656 s
      sync :  14.7481 s
yd.dask

HighLevelGraph

HighLevelGraph with 4 layers and 240 keys from all layers.

Layer1: normal

normal-6ad96170c4c61710dbc18b74e58c3cb2

layer_type MaterializedLayer
is_materialized True
number of outputs 100
shape (30000, 30000)
dtype float64
chunksize (3000, 3000)
type dask.array.core.Array
chunk_type numpy.ndarray
30000 30000

Layer2: mean_chunk

mean_chunk-1ccb39e699989873e56e0c577ab50469

layer_type Blockwise
is_materialized True
number of outputs 100
shape (30000, 30000)
dtype float64
chunksize (3000, 3000)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on normal-6ad96170c4c61710dbc18b74e58c3cb2
30000 30000

Layer3: mean_combine-partial

mean_combine-partial-9d7408b2918b12ded83c71dd3ff57f3e

layer_type MaterializedLayer
is_materialized True
number of outputs 30
shape (3, 30000)
dtype float64
chunksize (1, 3000)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on mean_chunk-1ccb39e699989873e56e0c577ab50469
30000 3

Layer4: mean_agg-aggregate

mean_agg-aggregate-8bc993aec3b4d7a1fc433f965f460473

layer_type MaterializedLayer
is_materialized True
number of outputs 10
shape (30000,)
dtype float64
chunksize (3000,)
type dask.array.core.Array
chunk_type numpy.ndarray
depends on mean_combine-partial-9d7408b2918b12ded83c71dd3ff57f3e
30000 1
yd
Array Chunk
Bytes 234.38 kiB 23.44 kiB
Shape (30000,) (3000,)
Count 4 Graph Layers 10 Chunks
Type float64 numpy.ndarray
30000 1
  • Notice how sync scheduler takes almost the same time as pure NumPy code.

  • Why is the multiprocessing scheduler so much slower?

If you use the multiprocessing backend, all communication between processes still needs to pass through the main process because processes are isolated from other processes. This introduces a large overhead.

The Dask developers recommend using the Dask Distributed Scheduler which we will cover now.

2. Distributed scheduler#

  • The Distributed scheduler or dask.distributed schedules tasks and manages the execution of those tasks on workers from a single or multiple machines.

  • This scheduler is more sophisticated and offers more features including a live diagnostic dashboard which provides live insight on performance and progress of the calculations.

In most cases, dask.distributed is preferred since it is very scalable, and provides and informative interactive dashboard and access to more complex Dask collections such as futures.

2.1. Local Cluster#

A Dask Local Cluster refers to a group of worker processes that run on a single machine and are managed by a single Dask scheduler.

This is useful for situations where the computational requirements are not large enough to warrant the use of a full cluster of separate machines. It provides an easy way to run parallel computations on a single machine, without the need for complex cluster management or other infrastructure.

Let’s start by creating a Local Cluster#

For this we need to set up a LocalCluster using dask.distributed and connect a client to it.

from dask.distributed import LocalCluster, Client

cluster = LocalCluster()
client = Client(cluster)
client

Client

Client-528e046a-a5a4-11ed-928c-3cecef1b11fa

Connection method: Cluster object Cluster type: distributed.LocalCluster
Dashboard: https://jupyterhub.hpc.ucar.edu/stable/user/negins/casper_16_4/proxy/8787/status

Cluster Info

☝️ Click the Dashboard link above.

👈 Or click the “Search” 🔍 button in the dask-labextension dashboard.

If no arguments are specified in LocalCluster it will automatically detect the number of CPU cores your system has and the amount of memory and create workers to appropriately fill that.

A LocalCluster will use the full resources of the current JupyterLab session. For example, if you used NCAR JupyterHub, it will use the number of CPUs selected.

Note that LocalCluster() takes a lot of optional arguments, allowing you to configure the number of processes/threads, memory limits and other settings.

You can also find your cluster dashboard link using :

cluster.dashboard_link
'https://jupyterhub.hpc.ucar.edu/stable/user/negins/casper_16_4/proxy/8787/status'
%%time
# -- dask array using the default
xd = da.random.normal(10, 0.1, size=(30_000, 30_000), chunks=(3000, 3000))
yd = xd.mean(axis=0)
yd.compute()
CPU times: user 499 ms, sys: 142 ms, total: 641 ms
Wall time: 10.1 s
array([10.00024901, 10.00025024, 10.00001342, ..., 10.00006029,
        9.99957823, 10.00021491])

Always remember to close your local Dask cluster:

client.shutdown()

Dask Distributed (Cluster)#

So far we have talked about running a job on a local machine.

Dask can be deployed on distributed infrastructure, such as a an HPC system or a cloud computing system.

Dask Clusters have different names corresponding to different computing environments. Some examples are dask-jobqueue for your HPC systems (including PBSCluster) or Kubernetes Cluster for machines on the Cloud.

In section 5, we will talk more about Dask on HPC Systems.