Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

Multi-model GMST from CMIP6 zarr (~27 GCMs)

Section 1: Introduction

  • Load python packkages

  • Load catalog url

from matplotlib import pyplot as plt
import xarray as xr
import numpy as np
import dask
from dask.diagnostics import progress
from tqdm.autonotebook import tqdm
import intake
import fsspec
import seaborn as sns
import re
import aiohttp
from dask_jobqueue import PBSCluster
import pandas as pd
import os
/glade/derecho/scratch/harshah/tmp/ipykernel_36098/3841926957.py:6: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
  from tqdm.autonotebook import tqdm
import fsspec.implementations.http as fshttp
from pelicanfs.core import OSDFFileSystem,PelicanMap 
# Set up your sratch folder path
username       = os.environ["USER"]
glade_scratch  = "/glade/derecho/scratch/" + username
print(glade_scratch)
#
# cat_url     = 'https://data.gdex.ucar.edu/d850001/catalogs/cmip6-osdf-zarr.json' 
cat_url     = 'https://osdata.gdex.ucar.edu/d010096/catalogs/d010096-osdf.json'
/glade/derecho/scratch/harshah

Section 2: Select Dask Cluster

Select the Dask cluster type

The default will be LocalCluster as that can run on any system.

If running on a HPC computer with a PBS Scheduler, set to True. Otherwise, set to False.

USE_PBS_SCHEDULER = True

If running on Jupyter server with Dask Gateway configured, set to True. Otherwise, set to False.

USE_DASK_GATEWAY = False

Python function for a PBS cluster

# Create a PBS cluster object
def get_pbs_cluster():
    """ Create cluster through dask_jobqueue.   
    """
    from dask_jobqueue import PBSCluster
    cluster = PBSCluster(
        job_name = 'dask-osdf-24',
        cores = 1,
        memory = '4GiB',
        processes = 1,
        local_directory = glade_scratch + '/dask/spill',
        log_directory = glade_scratch + '/dask/logs/',
        resource_spec = 'select=1:ncpus=1:mem=4GB',
        queue = 'casper',
        walltime = '3:00:00',
        #interface = 'ib0'
        interface = 'ext'
    )
    return cluster

Python function for a Gateway Cluster

def get_gateway_cluster():
    """ Create cluster through dask_gateway
    """
    from dask_gateway import Gateway

    gateway = Gateway()
    cluster = gateway.new_cluster()
    cluster.adapt(minimum=2, maximum=4)
    return cluster
def get_local_cluster():
    """ Create cluster using the Jupyter server's resources
    """
    from distributed import LocalCluster, performance_report
    cluster = LocalCluster()    

    cluster.scale(6)
    return cluster

Python logic for a Local Cluster

This uses True/False boolean logic based on the variables set in the previous cells

# Obtain dask cluster in one of three ways
if USE_PBS_SCHEDULER:
    cluster = get_pbs_cluster()
elif USE_DASK_GATEWAY:
    cluster = get_gateway_cluster()
else:
    cluster = get_local_cluster()

# Connect to cluster
from distributed import Client
client = Client(cluster)
/glade/u/home/harshah/.conda/envs/osdf/lib/python3.11/site-packages/distributed/node.py:188: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 43085 instead
  warnings.warn(
# Scale the cluster and display cluster dashboard URL
cluster.scale(8)
cluster
Loading...

Section 3: Data Loading

  • Load catalog and select data subset

col = intake.open_esm_datastore(cat_url)
col
Loading...
[eid for eid in col.df['experiment_id'].unique() if 'ssp' in eid]
['esm-ssp585-ssp126Lu', 'ssp126-ssp370Lu', 'ssp370-ssp126Lu', 'ssp585', 'ssp245', 'ssp370-lowNTCF', 'ssp370SST-ssp126Lu', 'ssp370SST', 'ssp370pdSST', 'ssp370SST-lowCH4', 'ssp370SST-lowNTCF', 'ssp126', 'ssp119', 'ssp370', 'esm-ssp585', 'ssp245-nat', 'ssp245-GHG', 'ssp460', 'ssp434', 'ssp534-over', 'ssp245-aer', 'ssp245-stratO3', 'ssp245-cov-fossil', 'ssp245-cov-modgreen', 'ssp245-cov-strgreen', 'ssp245-covid', 'ssp585-bgc']
# there is currently a significant amount of data for these runs
expts = ['historical', 'ssp245', 'ssp370']

query = dict(
    experiment_id=expts,
    table_id='Amon',
    variable_id=['tas'],
    member_id = 'r1i1p1f1',
    #activity_id = 'CMIP',
)

col_subset = col.search(require_all_on=["source_id"], **query)
col_subset
Loading...
col_subset.df.groupby("source_id")[["experiment_id", "variable_id", "table_id","activity_id"]].nunique()
Loading...
col_subset.df
Loading...
# %%time
# dsets_osdf  = col_subset.to_dataset_dict()
# print(f"\nDataset dictionary keys:\n {dsets_osdf.keys()}")
%%time
osdf_fs = OSDFFileSystem()
test_url    = '/ncar-gdex/d010096/cmip6-pds/CMIP6/CMIP/CSIRO-ARCCSS/ACCESS-CM2/historical/r1i1p1f1/Amon/tas/gn/v20191108/'
#
# test_ds = xr.open_dataset(osdf_fs.open(test_url,mode='rb'),engine='zarr')
test_ds = xr.open_zarr('osdf:///ncar-gdex/d010096/cmip6-pds/CMIP6/CMIP/CSIRO-ARCCSS/ACCESS-CM2/historical/r1i1p1f1/Amon/tas/gn/v20191108/')
test_ds
CPU times: user 78.7 ms, sys: 1.79 ms, total: 80.5 ms
Wall time: 2.82 s
Fetching long content....
test = xr.open_zarr('https://osdf-director.osg-htc.org/ncar-gdex/d010096/AS-RCEC/')
---------------------------------------------------------------------------
GroupNotFoundError                        Traceback (most recent call last)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/zarr.py:1868, in _get_open_params(store, mode, synchronizer, group, consolidated, consolidate_on_close, chunk_store, storage_options, zarr_version, use_zarr_fill_value_as_mask, zarr_format)
   1867 try:
-> 1868     zarr_root_group = zarr.open_consolidated(store, **open_kwargs)
   1869 except (ValueError, KeyError):
   1870     # ValueError in zarr-python 3.x, KeyError in 2.x.

File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/api/synchronous.py:238, in open_consolidated(use_consolidated, *args, **kwargs)
    234 """
    235 Alias for [`open_group`][zarr.api.synchronous.open_group] with ``use_consolidated=True``.
    236 """
    237 return Group(
--> 238     sync(async_api.open_consolidated(*args, use_consolidated=use_consolidated, **kwargs))
    239 )

File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/core/sync.py:159, in sync(coro, loop, timeout)
    158 if isinstance(return_result, BaseException):
--> 159     raise return_result
    160 else:

File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/core/sync.py:119, in _runner(coro)
    118 try:
--> 119     return await coro
    120 except Exception as ex:

File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/api/asynchronous.py:415, in open_consolidated(use_consolidated, *args, **kwargs)
    411     raise TypeError(
    412         "'use_consolidated' must be 'True' in 'open_consolidated'. Use 'open' with "
    413         "'use_consolidated=False' to bypass consolidated metadata."
    414     )
--> 415 return await open_group(*args, use_consolidated=use_consolidated, **kwargs)

File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/api/asynchronous.py:881, in open_group(store, mode, cache_attrs, synchronizer, path, chunk_store, storage_options, zarr_version, zarr_format, meta_array, attributes, use_consolidated)
    880 msg = f"No group found in store {store!r} at path {store_path.path!r}"
--> 881 raise GroupNotFoundError(msg)

GroupNotFoundError: No group found in store 'https://osdf-director.osg-htc.org/ncar-gdex/d010096/AS-RCEC/' at path ''

During handling of the above exception, another exception occurred:

GroupNotFoundError                        Traceback (most recent call last)
Cell In[28], line 1
----> 1 test = xr.open_zarr('https://osdf-director.osg-htc.org/ncar-gdex/d010096/AS-RCEC/')

File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/zarr.py:1586, in open_zarr(store, group, synchronizer, chunks, decode_cf, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, consolidated, overwrite_encoded_chunks, chunk_store, storage_options, decode_timedelta, use_cftime, zarr_version, zarr_format, use_zarr_fill_value_as_mask, chunked_array_type, from_array_kwargs, create_default_indexes, **kwargs)
   1572     raise TypeError(
   1573         "open_zarr() got unexpected keyword arguments " + ",".join(kwargs.keys())
   1574     )
   1576 backend_kwargs = {
   1577     "synchronizer": synchronizer,
   1578     "consolidated": consolidated,
   (...)   1583     "zarr_format": zarr_format,
   1584 }
-> 1586 ds = open_dataset(
   1587     filename_or_obj=store,
   1588     group=group,
   1589     decode_cf=decode_cf,
   1590     mask_and_scale=mask_and_scale,
   1591     decode_times=decode_times,
   1592     concat_characters=concat_characters,
   1593     decode_coords=decode_coords,
   1594     engine="zarr",
   1595     chunks=chunks,
   1596     drop_variables=drop_variables,
   1597     create_default_indexes=create_default_indexes,
   1598     chunked_array_type=chunked_array_type,
   1599     from_array_kwargs=from_array_kwargs,
   1600     backend_kwargs=backend_kwargs,
   1601     decode_timedelta=decode_timedelta,
   1602     use_cftime=use_cftime,
   1603     zarr_version=zarr_version,
   1604     use_zarr_fill_value_as_mask=use_zarr_fill_value_as_mask,
   1605 )
   1606 return ds

File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/api.py:606, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, create_default_indexes, inline_array, chunked_array_type, from_array_kwargs, backend_kwargs, **kwargs)
    594 decoders = _resolve_decoders_kwargs(
    595     decode_cf,
    596     open_backend_dataset_parameters=backend.open_dataset_parameters,
   (...)    602     decode_coords=decode_coords,
    603 )
    605 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 606 backend_ds = backend.open_dataset(
    607     filename_or_obj,
    608     drop_variables=drop_variables,
    609     **decoders,
    610     **kwargs,
    611 )
    612 ds = _dataset_from_backend_dataset(
    613     backend_ds,
    614     filename_or_obj,
   (...)    625     **kwargs,
    626 )
    627 return ds

File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/zarr.py:1660, in ZarrBackendEntrypoint.open_dataset(self, filename_or_obj, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta, group, mode, synchronizer, consolidated, chunk_store, storage_options, zarr_version, zarr_format, store, engine, use_zarr_fill_value_as_mask, cache_members)
   1658 filename_or_obj = _normalize_path(filename_or_obj)
   1659 if not store:
-> 1660     store = ZarrStore.open_group(
   1661         filename_or_obj,
   1662         group=group,
   1663         mode=mode,
   1664         synchronizer=synchronizer,
   1665         consolidated=consolidated,
   1666         consolidate_on_close=False,
   1667         chunk_store=chunk_store,
   1668         storage_options=storage_options,
   1669         zarr_version=zarr_version,
   1670         use_zarr_fill_value_as_mask=None,
   1671         zarr_format=zarr_format,
   1672         cache_members=cache_members,
   1673     )
   1675 store_entrypoint = StoreBackendEntrypoint()
   1676 with close_on_error(store):

File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/zarr.py:714, in ZarrStore.open_group(cls, store, mode, synchronizer, group, consolidated, consolidate_on_close, chunk_store, storage_options, append_dim, write_region, safe_chunks, align_chunks, zarr_version, zarr_format, use_zarr_fill_value_as_mask, write_empty, cache_members)
    688 @classmethod
    689 def open_group(
    690     cls,
   (...)    707     cache_members: bool = True,
    708 ):
    709     (
    710         zarr_group,
    711         consolidate_on_close,
    712         close_store_on_close,
    713         use_zarr_fill_value_as_mask,
--> 714     ) = _get_open_params(
    715         store=store,
    716         mode=mode,
    717         synchronizer=synchronizer,
    718         group=group,
    719         consolidated=consolidated,
    720         consolidate_on_close=consolidate_on_close,
    721         chunk_store=chunk_store,
    722         storage_options=storage_options,
    723         zarr_version=zarr_version,
    724         use_zarr_fill_value_as_mask=use_zarr_fill_value_as_mask,
    725         zarr_format=zarr_format,
    726     )
    728     return cls(
    729         zarr_group,
    730         mode,
   (...)    739         cache_members=cache_members,
    740     )

File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/zarr.py:1872, in _get_open_params(store, mode, synchronizer, group, consolidated, consolidate_on_close, chunk_store, storage_options, zarr_version, use_zarr_fill_value_as_mask, zarr_format)
   1869 except (ValueError, KeyError):
   1870     # ValueError in zarr-python 3.x, KeyError in 2.x.
   1871     try:
-> 1872         zarr_root_group = zarr.open_group(store, **open_kwargs)
   1873         emit_user_level_warning(
   1874             "Failed to open Zarr store with consolidated metadata, "
   1875             "but successfully read with non-consolidated metadata. "
   (...)   1885             RuntimeWarning,
   1886         )
   1887     except missing_exc as err:

File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/api/synchronous.py:549, in open_group(store, mode, cache_attrs, synchronizer, path, chunk_store, storage_options, zarr_version, zarr_format, meta_array, attributes, use_consolidated)
    478 def open_group(
    479     store: StoreLike | None = None,
    480     *,
   (...)    491     use_consolidated: bool | str | None = None,
    492 ) -> Group:
    493     """Open a group using file-mode-like semantics.
    494 
    495     Parameters
   (...)    546         The new group.
    547     """
    548     return Group(
--> 549         sync(
    550             async_api.open_group(
    551                 store=store,
    552                 mode=mode,
    553                 cache_attrs=cache_attrs,
    554                 synchronizer=synchronizer,
    555                 path=path,
    556                 chunk_store=chunk_store,
    557                 storage_options=storage_options,
    558                 zarr_version=zarr_version,
    559                 zarr_format=zarr_format,
    560                 meta_array=meta_array,
    561                 attributes=attributes,
    562                 use_consolidated=use_consolidated,
    563             )
    564         )
    565     )

File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/core/sync.py:159, in sync(coro, loop, timeout)
    156 return_result = next(iter(finished)).result()
    158 if isinstance(return_result, BaseException):
--> 159     raise return_result
    160 else:
    161     return return_result

File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/core/sync.py:119, in _runner(coro)
    114 """
    115 Await a coroutine and return the result of running it. If awaiting the coroutine raises an
    116 exception, the exception will be returned.
    117 """
    118 try:
--> 119     return await coro
    120 except Exception as ex:
    121     return ex

File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/api/asynchronous.py:881, in open_group(store, mode, cache_attrs, synchronizer, path, chunk_store, storage_options, zarr_version, zarr_format, meta_array, attributes, use_consolidated)
    874     return await AsyncGroup.from_store(
    875         store_path,
    876         zarr_format=_zarr_format,
    877         overwrite=overwrite,
    878         attributes=attributes,
    879     )
    880 msg = f"No group found in store {store!r} at path {store_path.path!r}"
--> 881 raise GroupNotFoundError(msg)

GroupNotFoundError: No group found in store 'https://osdf-director.osg-htc.org/ncar-gdex/d010096/AS-RCEC/' at path ''
def drop_all_bounds(ds):
    drop_vars = [vname for vname in ds.coords
                 if (('_bounds') in vname ) or ('_bnds') in vname]
    return ds.drop_vars(drop_vars)

# def open_dset(df):
#     assert len(df) == 1
#     mapper = fsspec.get_mapper(df.zstore.values[0])
#     path = df.zstore.values[0][7:]+".zmetadata"
#     ds = xr.open_zarr(mapper, consolidated=True, zarr_format=2) #Specify version of the zarr store
#     a_data = mapper.fs.get_access_data()
#     responses = a_data.get_responses(path)
#     print(responses)
#     return drop_all_bounds(ds)

def open_dset(df):
    assert len(df) == 1
    mapper = fsspec.get_mapper(df.zstore.values[0])
    #path = df.zstore.values[0][7:]+".zmetadata"
    ds = xr.open_zarr(mapper, consolidated=True,zarr_format=2)
    return drop_all_bounds(ds)

def open_delayed(df):
    return dask.delayed(open_dset)(df)

from collections import defaultdict
dsets = defaultdict(dict)

for group, df in col_subset.df.groupby(by=['source_id', 'experiment_id']):
    dsets[group[0]][group[1]] = open_delayed(df)
dsets_ = dask.compute(dict(dsets))[0]
---------------------------------------------------------------------------
FileNotFoundError                         Traceback (most recent call last)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/pelicanfs/core.py:1155, in wrapper()
   1154     logger.debug(f"Calling {func} using the following url: {data_url}")
-> 1155     result = await func(self, data_url, *args[1:], **kwargs)
   1156 except Exception as e:

File ~/.conda/envs/osdf/lib/python3.11/site-packages/pelicanfs/core.py:1236, in _cat_file()
   1234 @_cache_dec
   1235 async def _cat_file(self, path, start=None, end=None, **kwargs):
-> 1236     return await self.http_file_system._cat_file(path, start, end, **kwargs)

File ~/.conda/envs/osdf/lib/python3.11/site-packages/fsspec/implementations/http.py:247, in _cat_file()
    246     out = await r.read()
--> 247     self._raise_not_found_for_status(r, url)
    248 return out

File ~/.conda/envs/osdf/lib/python3.11/site-packages/fsspec/implementations/http.py:229, in _raise_not_found_for_status()
    228 if response.status == 404:
--> 229     raise FileNotFoundError(url)
    230 response.raise_for_status()

FileNotFoundError: https://ncar-cache.nationalresearchplatform.org:8443/ncar-gdex/d010096/cmip6-pds/CMIP6/CMIP/NOAA-GFDL/GFDL-ESM4/historical/r1i1p1f1/Amon/tas/gr1/v20190726/.zgroup

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
Cell In[18], line 1
----> 1 dsets_ = dask.compute(dict(dsets))[0]

File ~/.conda/envs/osdf/lib/python3.11/site-packages/dask/base.py:685, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    682     expr = expr.optimize()
    683     keys = list(flatten(expr.__dask_keys__()))
--> 685     results = schedule(expr, keys, **kwargs)
    687 return repack(results)

Cell In[17], line 20, in open_dset()
     18 mapper = fsspec.get_mapper(df.zstore.values[0])
     19 #path = df.zstore.values[0][7:]+".zmetadata"
---> 20 ds = xr.open_zarr(mapper, consolidated=True,zarr_format=2)
     21 return drop_all_bounds(ds)

File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/zarr.py:1586, in open_zarr()
   1572     raise TypeError(
   1573         "open_zarr() got unexpected keyword arguments " + ",".join(kwargs.keys())
   1574     )
   1576 backend_kwargs = {
   1577     "synchronizer": synchronizer,
   1578     "consolidated": consolidated,
   (...)   1583     "zarr_format": zarr_format,
   1584 }
-> 1586 ds = open_dataset(
   1587     filename_or_obj=store,
   1588     group=group,
   1589     decode_cf=decode_cf,
   1590     mask_and_scale=mask_and_scale,
   1591     decode_times=decode_times,
   1592     concat_characters=concat_characters,
   1593     decode_coords=decode_coords,
   1594     engine="zarr",
   1595     chunks=chunks,
   1596     drop_variables=drop_variables,
   1597     create_default_indexes=create_default_indexes,
   1598     chunked_array_type=chunked_array_type,
   1599     from_array_kwargs=from_array_kwargs,
   1600     backend_kwargs=backend_kwargs,
   1601     decode_timedelta=decode_timedelta,
   1602     use_cftime=use_cftime,
   1603     zarr_version=zarr_version,
   1604     use_zarr_fill_value_as_mask=use_zarr_fill_value_as_mask,
   1605 )
   1606 return ds

File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/api.py:606, in open_dataset()
    594 decoders = _resolve_decoders_kwargs(
    595     decode_cf,
    596     open_backend_dataset_parameters=backend.open_dataset_parameters,
   (...)    602     decode_coords=decode_coords,
    603 )
    605 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 606 backend_ds = backend.open_dataset(
    607     filename_or_obj,
    608     drop_variables=drop_variables,
    609     **decoders,
    610     **kwargs,
    611 )
    612 ds = _dataset_from_backend_dataset(
    613     backend_ds,
    614     filename_or_obj,
   (...)    625     **kwargs,
    626 )
    627 return ds

File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/zarr.py:1660, in open_dataset()
   1658 filename_or_obj = _normalize_path(filename_or_obj)
   1659 if not store:
-> 1660     store = ZarrStore.open_group(
   1661         filename_or_obj,
   1662         group=group,
   1663         mode=mode,
   1664         synchronizer=synchronizer,
   1665         consolidated=consolidated,
   1666         consolidate_on_close=False,
   1667         chunk_store=chunk_store,
   1668         storage_options=storage_options,
   1669         zarr_version=zarr_version,
   1670         use_zarr_fill_value_as_mask=None,
   1671         zarr_format=zarr_format,
   1672         cache_members=cache_members,
   1673     )
   1675 store_entrypoint = StoreBackendEntrypoint()
   1676 with close_on_error(store):

File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/zarr.py:714, in open_group()
    688 @classmethod
    689 def open_group(
    690     cls,
   (...)    707     cache_members: bool = True,
    708 ):
    709     (
    710         zarr_group,
    711         consolidate_on_close,
    712         close_store_on_close,
    713         use_zarr_fill_value_as_mask,
--> 714     ) = _get_open_params(
    715         store=store,
    716         mode=mode,
    717         synchronizer=synchronizer,
    718         group=group,
    719         consolidated=consolidated,
    720         consolidate_on_close=consolidate_on_close,
    721         chunk_store=chunk_store,
    722         storage_options=storage_options,
    723         zarr_version=zarr_version,
    724         use_zarr_fill_value_as_mask=use_zarr_fill_value_as_mask,
    725         zarr_format=zarr_format,
    726     )
    728     return cls(
    729         zarr_group,
    730         mode,
   (...)    739         cache_members=cache_members,
    740     )

File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/zarr.py:1864, in _get_open_params()
   1860 group = open_kwargs.pop("path")
   1862 if consolidated:
   1863     # TODO: an option to pass the metadata_key keyword
-> 1864     zarr_root_group = zarr.open_consolidated(store, **open_kwargs)
   1865 elif consolidated is None:
   1866     # same but with more error handling in case no consolidated metadata found
   1867     try:

File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/api/synchronous.py:238, in open_consolidated()
    233 def open_consolidated(*args: Any, use_consolidated: Literal[True] = True, **kwargs: Any) -> Group:
    234     """
    235     Alias for [`open_group`][zarr.api.synchronous.open_group] with ``use_consolidated=True``.
    236     """
    237     return Group(
--> 238         sync(async_api.open_consolidated(*args, use_consolidated=use_consolidated, **kwargs))
    239     )

File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/core/sync.py:159, in sync()
    156 return_result = next(iter(finished)).result()
    158 if isinstance(return_result, BaseException):
--> 159     raise return_result
    160 else:
    161     return return_result

File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/core/sync.py:119, in _runner()
    114 """
    115 Await a coroutine and return the result of running it. If awaiting the coroutine raises an
    116 exception, the exception will be returned.
    117 """
    118 try:
--> 119     return await coro
    120 except Exception as ex:
    121     return ex

File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/api/asynchronous.py:415, in open_consolidated()
    410 if use_consolidated is not True:
    411     raise TypeError(
    412         "'use_consolidated' must be 'True' in 'open_consolidated'. Use 'open' with "
    413         "'use_consolidated=False' to bypass consolidated metadata."
    414     )
--> 415 return await open_group(*args, use_consolidated=use_consolidated, **kwargs)

File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/api/asynchronous.py:866, in open_group()
    864 try:
    865     if mode in _READ_MODES:
--> 866         return await AsyncGroup.open(
    867             store_path, zarr_format=zarr_format, use_consolidated=use_consolidated
    868         )
    869 except (KeyError, FileNotFoundError):
    870     pass

File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/core/group.py:548, in open()
    545 if use_consolidated or use_consolidated is None:
    546     paths.append(store_path / consolidated_key)
--> 548 zgroup_bytes, zattrs_bytes, *rest = await asyncio.gather(
    549     *[path.get() for path in paths]
    550 )
    551 if zgroup_bytes is None:
    552     raise FileNotFoundError(store_path)

File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/storage/_common.py:168, in get()
    166 if prototype is None:
    167     prototype = default_buffer_prototype()
--> 168 return await self.store.get(self.path, prototype=prototype, byte_range=byte_range)

File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/storage/_fsspec.py:289, in get()
    287 try:
    288     if byte_range is None:
--> 289         value = prototype.buffer.from_bytes(await self.fs._cat_file(path))
    290     elif isinstance(byte_range, RangeByteRequest):
    291         value = prototype.buffer.from_bytes(
    292             await self.fs._cat_file(
    293                 path,
   (...)    296             )
    297         )

File ~/.conda/envs/osdf/lib/python3.11/site-packages/pelicanfs/core.py:1158, in wrapper()
   1156 except Exception as e:
   1157     if not self.direct_reads:
-> 1158         self._bad_cache(data_url, e)
   1159     raise
   1160 if not self.direct_reads:

File ~/.conda/envs/osdf/lib/python3.11/site-packages/pelicanfs/core.py:740, in _bad_cache()
    738 if not namespace_info:
    739     return
--> 740 namespace_info.cache_manager.bad_cache(bad_cache)

File ~/.conda/envs/osdf/lib/python3.11/site-packages/pelicanfs/core.py:178, in bad_cache()
    176 bad_cache_url = cache_url_parsed.geturl()
    177 with self._lock:
--> 178     self._cache_list.remove(bad_cache_url)

ValueError: list.remove(x): x not in list
#calculate global means
def get_lat_name(ds):
    for lat_name in ['lat', 'latitude']:
        if lat_name in ds.coords:
            return lat_name
    raise RuntimeError("Couldn't find a latitude coordinate")

def global_mean(ds):
    lat = ds[get_lat_name(ds)]
    weight = np.cos(np.deg2rad(lat))
    weight /= weight.mean()
    other_dims = set(ds.dims) - {'time'}
    return (ds * weight).mean(other_dims)
expt_da = xr.DataArray(expts, dims='experiment_id', name='experiment_id',
                       coords={'experiment_id': expts})

dsets_aligned = {}

for k, v in tqdm(dsets_.items()):
    expt_dsets = v.values()
    if any([d is None for d in expt_dsets]):
        print(f"Missing experiment for {k}")
        continue

    for ds in expt_dsets:
        ds.coords['year'] = ds.time.dt.year

    # workaround for
    # https://github.com/pydata/xarray/issues/2237#issuecomment-620961663
    dsets_ann_mean = [v[expt].pipe(global_mean).swap_dims({'time': 'year'})
                             .drop_vars('time').coarsen(year=12).mean()
                      for expt in expts]

    # align everything with the 4xCO2 experiment
    dsets_aligned[k] = xr.concat(dsets_ann_mean, join='outer',dim=expt_da)
%%time
with progress.ProgressBar():
    dsets_aligned_ = dask.compute(dsets_aligned)[0]
source_ids = list(dsets_aligned_.keys())
source_da = xr.DataArray(source_ids, dims='source_id', name='source_id',
                         coords={'source_id': source_ids})

big_ds = xr.concat([ds.reset_coords(drop=True)
                    for ds in dsets_aligned_.values()],
                    dim=source_da)

big_ds

Observational time series data for comparison with ensemble spread

  • The observational data we will use is the HadCRUT5 dataset from the UK Met Office

  • The data has been downloaded to NCAR’s Geoscience Data Exchange (GDEX) from https://www.metoffice.gov.uk/hadobs/hadcrut5/

  • We will use an OSDF file system to access this copy from the GDEX

osdf_fs = OSDFFileSystem()
print(osdf_fs)
#
obs_url    = '/ncar/gdex/d850001/HadCRUT.5.0.2.0.analysis.summary_series.global.monthly.nc'
#
obs_ds = xr.open_dataset(osdf_fs.open(obs_url,mode='rb'),engine='h5netcdf').tas_mean
obs_ds
# obs_url    = 'osdf:///ncar/gdex/d850001/HadCRUT.5.0.2.0.analysis.summary_series.global.monthly.zarr'
# print(obs_url)
# #
# obs_ds = xr.open_zarr(obs_url).tas_mean
# # obs_ds
# Compute annual mean temperatures anomalies
obs_gmsta = obs_ds.resample(time='YS').mean(dim='time')
obs_gmsta

Section 4: Data Analysis

  • Calculate Global Mean Surface Temperature Anomaly (GMSTA)

  • Grab some observations/ ERA5 reanalysis data

  • Convert xarray datasets to dataframes

  • Use Seaborn to plot GMSTA

df_all = big_ds.to_dataframe().reset_index()
df_all.head()
# Compute anomaly w.r.t 1960-1990 baseline
# Define the baseline period
baseline_df = df_all[(df_all["year"] >= 1960) & (df_all["year"] <= 1990)]

# Compute the baseline mean
baseline_mean = baseline_df["tas"].mean()

# Compute anomalies
df_all["tas_anomaly"] = df_all["tas"] - baseline_mean
df_all
obs_df = obs_gmsta.to_dataframe(name='tas_anomaly').reset_index()
# obs_df
# Convert 'time' to 'year' (keeping only the year)
obs_df['year'] = obs_df['time'].dt.year

# Drop the original 'time' column since we extracted 'year'
obs_df = obs_df[['year', 'tas_anomaly']]
obs_df
# Create the main plot
g = sns.relplot(data=df_all, x="year", y="tas_anomaly",
                hue='experiment_id', kind="line", errorbar="sd", aspect=2, palette="Set2")  # Adjust the color palette)

# Get the current axis from the FacetGrid
ax = g.ax

# Overlay the observational data in red
sns.lineplot(data=obs_df, x="year", y="tas_anomaly",color="red", 
             linestyle="dashed", linewidth=2,label="Observations", ax=ax)

# Adjust the legend to include observations
ax.legend(title="Experiment ID + Observations")

# Show the plot
plt.show()
cluster.close()