Stream: dask
Topic: Using dask on casper with geocat
Brian Medeiros (Oct 04 2021 at 16:49):
I'd like to use geocat's interp_hybrid_to_pressure
function on a large series of simulations, but it seems very slow. I noticed that its output is a dask object, so I figured this was a good opportunity to try to learn how to use dask better on casper. Unfortunately, my attempt has totally failed. Using a single dataarray, the function no longer even completes the interpolation step (takes about a minute without setting up the dask cluster). I tried to follow the example of setting up dask from https://ncar.github.io/esds/posts/2021/casper_pbs_dask/ and I upped the resources to be 6 cores, 128GB memory, and 12 processes. Here's how I tried to set up the cluster:
# Import dask
import dask
# Use dask jobqueue
from dask_jobqueue import PBSCluster
# Import a client
from dask.distributed import Client
# Setup your PBSCluster
cluster = PBSCluster(
cores=6, # The number of cores you want
memory='128GB', # Amount of memory
processes=12, # How many processes
queue='casper', # The type of queue to utilize (/glade/u/apps/dav/opt/usr/bin/execcasper)
local_directory='$TMPDIR', # Use your local directory
resource_spec='select=1:ncpus=6:mem=128GB', # Specify resources
project='P03010039', # Input your project ID here
walltime='02:00:00', # Amount of wall time
interface='ib0', # Interface to use
)
client = Client(cluster)
# Setup your PBSCluster - make sure that it uses the casper queue
cluster = PBSCluster(queue='casper')
My script is here: /glade/u/home/brianpm/example_dask_geocat.py
.
Anyone have an idea where I've gone off the rails here?
Isla Simpson (Oct 04 2021 at 17:10):
Hi Brian, I'm not sure I can be helpful in seeing what's going on but I have been using the interp_hybrid_to_pressure with some success. I was trying to do things quickly and rather than using geocat I just pasted the function into my notebooks. An examples is here...
/glade/u/home/islas/scripts/setupSNAP/postprocess/6hrZ/postprocessdata_6hrZ_3D.ipynb. It does seem slow to be compared to NCL, but it does actually complete and is not terribly slow. Glad to hear if there are more efficient ways or if there's something that I have set here that makes a difference.
Matt Long (Oct 04 2021 at 17:13):
cc @geocat
Brian Medeiros (Oct 04 2021 at 17:21):
Thanks, @Isla Simpson . Yeah, the function itself works okay. My backup plan is to use the version of the function that I had passed on to the geocat folks and wrap it using Numba, which makes it much faster by compiling the function. I'm hoping that I can someday figure out how to get dask to speed up code (so far my experience seems to go in the opposite direction).
Orhan Eroglu (Oct 04 2021 at 17:22):
@Brian Medeiros @Isla Simpson these are great cases for us to test interp_hybrid_to_pressure
with real data sets. I will try to work on both of your cases with @Anissa Zacharias, who did a lot of Dask investigations recently, once both of us have some time. That way we can see if there are bottlenecks with our function flow and/or Dask use. Will keep you all updated.
Katie Dagon (Oct 04 2021 at 20:49):
@Brian Medeiros Is it possible you need to add a cluster.scale
command before the Client(cluster)
command? This is something I do following the examples I've seen. I think it relates to asking for a certain number of dask workers, but I'm not always sure how to set the number. I also noticed you have a second cluster = PBSCluster()
command at the bottom of your code example above - might this be resetting the previous PBSCluster command?
When using dask in a Jupyter notebook, I have also found the dask dashboard very helpful to verify that the workers are ready, and for monitoring progress.
Orhan Eroglu (Oct 07 2021 at 16:21):
Hi @Brian Medeiros and maybe @Isla Simpson ,
@Anissa Zacharias and I worked on Brian's code and were able to get it run successfully in about 25 seconds on Casper with Dask. I think it is a good wall time for such a big data set (let us know your thought though), but it could be made a bit better if we play with the chunk sizes etc. more.
I'd like to share a few findings of ours with your code below, and I am happy to have a video chat with you and others in this topic to go through the details and get your end up and running (e.g. looking into your personal Dask configs on glade etc.):
-
First of all, you can find two notebooks with your code corrected (
Anissa.ipynb
andOrhan.ipynb
) at/glade/u/home/oero/src/brian_issue_interp_hybrid_to_sigma
. What we did as corrections in those notebooks are itemized below: -
We agree with @Katie Dagon 's suggestions: In your code, you already "Setup your PBSCluster" in the larger block with several resource parameters determined by you, then your last line of code (
cluster = PBSCluster(queue='casper')
) discards whatever cluster you already set up in that larger block. That final line needs to be removed. In addition, we recommend to usecluster.scale
, too. -
You may also consider using
NCARCluster
instead ofPBSCluster
to set up your cluster as we did in the notebook. It is just more convenience rather than anything as a technical difference under the hood. To use NCARCluster, you will need to setup a config file for yourself as described here. -
WHatever you are using to setup your cluster/client, you alwayw need to check first if your cluster and client are being setup successfully. You can leverage calling
client
after setting up your cluster/client, and it will show you the client info, Dask dashboard link, and number of workers etc. to show you that you have your cluster/client up and ready for Dask calculations. I am happy to wlalk you through this in a video chat. -
Dask setup is sometimes ambigous, and if you don't check your cluster/client as in the previous bullet point, you may think your function is stuck, but the reality might be just that you don't have your client up, or your workers have been allocated the resources you requested etc. For example, the latetr is the case I have been currently facing on my end: For some reason, I cannot get the resources allocated that I request for my NCARCluster setup. I am using all the configs the same with @Anissa Zacharias 's ones, but I haven't been able to get resources for my notebook on Casper yet. I may need further support from someone else here.
-
Once you have your cluster/client up, you should be good to successfully run
interp_hybrid_to_sigma
as we are able to do so on @Anissa Zacharias 's end. -
There are some other details we played with in your code:
- For the large data arrays (such as
ocld
and maybee_ps
in yours), using.persist()
as we did in your code could make the calculations efficient in distributed environments such as Casper. - In your code, you were chunking the output (
ocld_plev
) of the function. It will only benefit you if you will use that output as an input to another function. Instead, chunking your large input arrays before feeding them into the function you run (i.e.interp_hybrid_to_pressure
in this case) will benefit you with Dask's distributed performance. Please see below:
ocld_chunked = ocld.chunk({'time':10, 'lat':192, 'lon':288})
`ocld_plev = gc.interp_hybrid_to_pressure(ocld_chunked, e_ps, hyam, hybm, p0=pref, new_levels=np.arange(100000., 5000., -2500), method='linear') #.compute()`
- For the large data arrays (such as
We hope this helps. Let us know if you need to further discuss anything and/or work with us to set up your end.
Brian Medeiros (Oct 07 2021 at 17:12):
Thanks @Orhan Eroglu and @Anissa Zacharias !! There's a lot here; I'll go through it in detail as soon as I can. I'll be in touch if I have follow-up questions.
Last updated: Jan 30 2022 at 12:01 UTC