Skip to article frontmatterSkip to article content

Access CESM2 LENS data from NCAR’s Geoscience Data Exchange (GDEX) and compute GMST

Section 1: Introduction

  • Python package imports and useful function definitions
# Display output of plots directly in Notebook
%matplotlib inline
import warnings
warnings.filterwarnings("ignore")
import intake
import numpy as np
import xarray as xr
import nc_time_axis
import dask 
from dask_jobqueue import PBSCluster
from dask.distributed import Client
from dask.distributed import performance_report
lustre_scratch   = "/lustre/desc1/scratch/harshah"
#
catalog_url = 'https://data-osdf.gdex.ucar.edu/ncar-gdex/d010092/catalogs/d010092-osdf-zarr.json' 
# catalog_url = 'https://stratus.gdex.ucar.edu/d010092/catalogs/d010092-https-zarr.json' #NCAR's Object store
# GMST function ###
# calculate global means
def get_lat_name(ds):
    for lat_name in ['lat', 'latitude']:
        if lat_name in ds.coords:
            return lat_name
    raise RuntimeError("Couldn't find a latitude coordinate")

def global_mean(ds):
    lat = ds[get_lat_name(ds)]
    weight = np.cos(np.deg2rad(lat))
    weight /= weight.mean()
    other_dims = set(ds.dims) - {'time','member_id'}
    return (ds * weight).mean(other_dims)

Section 2: Select Dask Cluster

  • Setting up a dask cluster. You will need to choose the type of cluster based on your use case.
  • The default will be LocalCluster as that can run on any system.
  • If running on NCAR’s HPC i.e, Casper with a PBS Scheduler, set USE_PBS_SCHEDULER to True. Otherwise, set to False.
USE_PBS_SCHEDULER = True

If running on Jupyter server with Dask Gateway configured, set to True. Otherwise, set to False.

USE_DASK_GATEWAY = False

Python function for a PBS cluster

# Create a PBS cluster object
def get_pbs_cluster():
    """ Create cluster through dask_jobqueue.   
    """
    from dask_jobqueue import PBSCluster
    cluster = PBSCluster(
        job_name = 'dask-osdf-25',
        cores = 1,
        memory = '4GiB',
        processes = 1,
        local_directory = lustre_scratch + '/dask/spill',
        log_directory = lustre_scratch + '/dask/logs/',
        resource_spec = 'select=1:ncpus=1:mem=4GB',
        queue = 'casper',
        walltime = '3:00:00',
        #interface = 'ib0'
        interface = 'ext'
    )
    return cluster

Python function for a Gateway Cluster

def get_gateway_cluster():
    """ Create cluster through dask_gateway
    """
    from dask_gateway import Gateway

    gateway = Gateway()
    cluster = gateway.new_cluster()
    cluster.adapt(minimum=2, maximum=4)
    return cluster

Python function for a Local Cluster

def get_local_cluster():
    """ Create cluster using the Jupyter server's resources
    """
    from distributed import LocalCluster, performance_report
    cluster = LocalCluster()    

    return cluster

Python logic to select the Dask Cluster type

  • This uses True/False boolean logic based on the variables set in the previous cells
# Obtain dask cluster in one of three ways

if USE_PBS_SCHEDULER:
    cluster = get_pbs_cluster()
elif USE_DASK_GATEWAY:
    cluster = get_gateway_cluster()
else:
    cluster = get_local_cluster()

# Connect to cluster
from distributed import Client
client = Client(cluster)
cluster.scale(4)
client.wait_for_workers(4)
cluster
Loading...

Section 3: Data Loading

# Open collection description file using intake
col   = intake.open_esm_datastore(catalog_url)
col
Loading...
cesm_temp = col.search(variable ='TREFHT', frequency ='monthly')
cesm_temp
Loading...
cesm_temp.df['path'].values
array(['https://stratus.gdex.ucar.edu/d010092/atm/monthly/cesm2LE-historical-cmip6-TREFHT.zarr', 'https://stratus.gdex.ucar.edu/d010092/atm/monthly/cesm2LE-historical-smbb-TREFHT.zarr', 'https://stratus.gdex.ucar.edu/d010092/atm/monthly/cesm2LE-ssp370-cmip6-TREFHT.zarr', 'https://stratus.gdex.ucar.edu/d010092/atm/monthly/cesm2LE-ssp370-smbb-TREFHT.zarr'], dtype=object)
dsets_cesm = cesm_temp.to_dataset_dict()
Loading...
dsets_cesm.keys()
dict_keys(['atm.historical.monthly.cmip6', 'atm.ssp370.monthly.cmip6', 'atm.historical.monthly.smbb', 'atm.ssp370.monthly.smbb'])
historical_cmip6 = dsets_cesm['atm.historical.monthly.cmip6']
future_cmip6     = dsets_cesm['atm.ssp370.monthly.cmip6']
future_cmip6 
Loading...

Make a quick plot to check data transfer

%%time
future_cmip6.TREFHT.isel(member_id=0,time=0).plot()
CPU times: user 216 ms, sys: 18.6 ms, total: 234 ms
Wall time: 2.44 s
<Figure size 640x480 with 2 Axes>

Section 4: Data Analysis

  • Perform the Global Mean Surface Temperature computation

Merge datasets and compute Global Mean surrface temperature anomaly

  • Warning! This section takes about a min to run!
  • Config: 3 dask workers with 8GiB memory.
merge_ds_cmip6 = xr.concat([historical_cmip6, future_cmip6], dim='time')
# merge_ds_cmip6 = merge_ds_cmip6.dropna(dim='member_id')
merge_ds_cmip6 = merge_ds_cmip6.TREFHT
merge_ds_cmip6
Loading...

Compute (spatially weighted) Global Mean

ds_cmip6_annual = merge_ds_cmip6.resample(time='AS').mean()
ds_cmip6_annual
Loading...
%%time
gmst_cmip6 = global_mean(ds_cmip6_annual)
gmst_cmip6 = gmst_cmip6.rename('gmst')
gmst_cmip6
Loading...

Compute anomaly and plot

gmst_cmip6_ano = gmst_cmip6 - gmst_cmip6.mean()
gmst_cmip6_ano
Loading...
gmst_cmip6_ano = gmst_cmip6_ano.compute()
%%time
gmst_cmip6_ano.mean(dim='member_id').plot()
CPU times: user 25.5 ms, sys: 0 ns, total: 25.5 ms
Wall time: 28.4 ms
<Figure size 640x480 with 1 Axes>
cluster.close()