An Example of Using Intake-ESM#

This past week, NCAR CISL updated the Casper Node to use PBS instead of Slurm for scheduling jobs. This led a post in which an example of spinning up dask clusters on the new configuration. This was also an opportunity to dig into dask, and try applying it to a sample task, specifically looking at ecosystem variables in the CESM-LE dataset, using notebooks included in Matt Long’s krill-cesm-le repository, modified by Kristen Krumhardt.

Setting up the imports/dask#

%matplotlib inline
import os
import shutil

from glob import glob

import cftime

import numpy as np
import xarray as xr

import matplotlib.pyplot as plt
import matplotlib.colors as colors

import cartopy.crs as ccrs
from cartopy.util import add_cyclic_point

import intake
import pop_tools
import esmlab
import util

import warnings
warnings.filterwarnings('ignore')

Spin up Dask Cluster#

Here, we spin up our dask cluster. At first, running this notebook resulted in a killed worker error. After further expection, we noticed that additional resources would be needed to read in the notebook since the data are so large (on the order of ~1-2 TB). Increasing the individual worker to a higher amount (ex. 256 GB) solved the issue. Scale up to as many workers as you think are neccessary for the calculation (this may take some trial and error).

You should base your chunks on your analysis - so in this case, we are interested in the top 100 m in the ocean, so chunking by z_t, the vertical coordinate, will help reduce the memory.

Good general advice for working with dask is to check the dask graph (accessed via the url when you call the client). When there are “orange” regions within your task graph, this means your workers are getting close to maxing out on memory.

import dask

# Use dask jobqueue
from dask_jobqueue import PBSCluster

# Import a client
from dask.distributed import Client

# Setup your PBSCluster
cluster = PBSCluster(
    cores=2, # The number of cores you want
    memory='256 GB', # Amount of memory
    processes=1, # How many processes
    queue='casper', # The type of queue to utilize (/glade/u/apps/dav/opt/usr/bin/execcasper)
    local_directory='$TMPDIR', # Use your local directory
    resource_spec='select=1:ncpus=2:mem=256GB', # Specify resources
    project='project_id', # Input your project ID here
    walltime='02:00:00', # Amount of wall time
    interface='ib0', # Interface to use
)
# Scale up
cluster.scale(4)

# Change your url to the dask dashboard so you can see it
dask.config.set({'distributed.dashboard.link':'https://jupyterhub.hpc.ucar.edu/stable/user/{USER}/proxy/{port}/status'})

# Setup your client
client = Client(cluster)

Read the CESM-LE data#

We will use intake-esm, which is a data catalog tool. It enables querying a database for the files we want, then loading those directly as an xarray.Dataset.

First step is to set the “collection” for the CESM-LE, which depends on a json file conforming to the ESM Catalog Specification.

catalog_file = '/glade/u/home/kristenk/TTE_CESM-LE/krill-cesm-le/notebooks/data/glade-cesm1-le.json'
variables = ['TEMP', 'diatC']

experiments = ['20C', 'RCP85']
stream = 'pop.h'

col = intake.open_esm_datastore(catalog_file)

Now we search the collection for the ensemble members (unique member_id’s) that have a chlorophyll field (diatChl). This is necessary because the ocean biogeochemistry was corrupted in some members and the data were deleted.

In this cell, member_id is a list of the ensemble members we want to operate on.

col_sub = col.search(experiment=['20C'],
                     stream='pop.h',
                     variable=['diatChl'])

member_id = list(col_sub.df.member_id.unique())
print(member_id)

Apply the search for data#

Specify a list of variables and perform a search. Under the hood, the search functionality uses pandas data frames. We can view that frame here using the .df syntax.

col_sub = col.search(
    experiment=experiments,
    stream=stream,
    variable=variables,
    member_id=member_id,
    )

print(col_sub)

col_sub.df.head()

Use .to_dataset_dict to read in the datasets as a dictionary, specifying the chunk size. This is also a point where it is recommended you take a look at the chunksize for the dask array, ensuring that it is a reasonable number (~100-200 mb). Test it out on one of the variables (ex. TEMP).

Since we are only interested in the top 100 m, we can chunk by z_t by 10 which should reduce the memory usage.

dsets = col_sub.to_dataset_dict(cdf_kwargs={'chunks': {'time':5, 'z_t':10}, 'decode_times': False})

Apply an operation#

Apply a calculation. In this case, we are calculating the average temperature within the top 100 m of the ocean.

def compute_TEMP_100m(ds):
    """compute top 100m mean temperature"""

    ds['TEMP_100m_mean'] = ds.TEMP.isel(z_t=slice(0,10)).mean(dim='z_t')
    ds.TEMP_100m_mean.attrs = ds.TEMP.attrs
    ds.TEMP_100m_mean.attrs['long_name'] = 'Mean temperature over top 100m'

    return ds.drop(['TEMP'])

Now apply this function to the dataset(s)

# compute top 100m temperature
dsets2 = {key: compute_TEMP_100m(ds) for key, ds in dsets2.items()}
print('computed top 100m temp')

Combine the datasets#

Concatenate the datasets in time, i.e. 20C + RCP8.5 experiments.

ordered_dsets_keys = ['ocn,20C,pop.h', 'ocn,RCP85,pop.h']
#ordered_dsets_keys = ['ocn.20C.pop.h', 'ocn.RCP85.pop.h']
ds = xr.concat(
    [dsets2[exp] for exp in ordered_dsets_keys],
    dim='time',
    data_vars='minimal',
    #compat='override' ## added this
)
time_encoding = dsets2[ordered_dsets_keys[0]].time.encoding

Calculate an annual mean#

Create an annual mean for temperature, using a utility function which deals with the times correctly.

ds_ann = util.ann_mean(ds, time_bnds_varname='time_bound', time_centered=True)

Write the dataset to disk#

Here, we are just looking to write out a few of the variables.


# Create a list of variables to write out
keep_vars = ['time_bound', 'TAREA', 'time', 'dz', 'KMT', 'member_id', 'TLAT', 'TLONG', 'TEMP_100m_mean']

# Subset for these variables
ds_out = ds_ann[keep_vars])

# Run the computations
ds_out.compute()

# Write out to scratch since this is likely a large file
ds_out.to_netcdf('/glade/scratch/username/CESM-LE-output/CESM-LE-TEMP_100m_mean.nc')

While this is a rather specific example, I hope this illustrates how useful intake-esm can be and the power of dask for computing these computationally expensive calculations on large datasets.