origin:aws origin:ncar-posix origin:ncar-object-store platform:casper dataset:cmip6 task:visualization level:intermediate
Access CMIP6 zarr data from AWS using the osdf protocol and compute GMSTA¶
This workflow is inspired by https://
gallery .pangeo .io /repos /pangeo -gallery /cmip6 /global _mean _surface _temp .html
Table of Contents¶
Section 1: Introduction¶
Load python packkages
Load catalog url
from matplotlib import pyplot as plt
import xarray as xr
import numpy as np
import dask
from dask.diagnostics import progress
from tqdm.autonotebook import tqdm
import intake
import fsspec
import seaborn as sns
import re
import aiohttp
from dask_jobqueue import PBSCluster
import pandas as pd
import os/glade/derecho/scratch/harshah/tmp/ipykernel_36098/3841926957.py:6: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
from tqdm.autonotebook import tqdm
import fsspec.implementations.http as fshttp
from pelicanfs.core import OSDFFileSystem,PelicanMap # Set up your sratch folder path
username = os.environ["USER"]
glade_scratch = "/glade/derecho/scratch/" + username
print(glade_scratch)
#
# cat_url = 'https://data.gdex.ucar.edu/d850001/catalogs/cmip6-osdf-zarr.json'
cat_url = 'https://osdata.gdex.ucar.edu/d010096/catalogs/d010096-osdf.json'/glade/derecho/scratch/harshah
Section 2: Select Dask Cluster¶
Select the Dask cluster type¶
The default will be LocalCluster as that can run on any system.
If running on a HPC computer with a PBS Scheduler, set to True. Otherwise, set to False.
USE_PBS_SCHEDULER = TrueIf running on Jupyter server with Dask Gateway configured, set to True. Otherwise, set to False.
USE_DASK_GATEWAY = FalsePython function for a PBS cluster¶
# Create a PBS cluster object
def get_pbs_cluster():
""" Create cluster through dask_jobqueue.
"""
from dask_jobqueue import PBSCluster
cluster = PBSCluster(
job_name = 'dask-osdf-24',
cores = 1,
memory = '4GiB',
processes = 1,
local_directory = glade_scratch + '/dask/spill',
log_directory = glade_scratch + '/dask/logs/',
resource_spec = 'select=1:ncpus=1:mem=4GB',
queue = 'casper',
walltime = '3:00:00',
#interface = 'ib0'
interface = 'ext'
)
return clusterPython function for a Gateway Cluster¶
def get_gateway_cluster():
""" Create cluster through dask_gateway
"""
from dask_gateway import Gateway
gateway = Gateway()
cluster = gateway.new_cluster()
cluster.adapt(minimum=2, maximum=4)
return clusterdef get_local_cluster():
""" Create cluster using the Jupyter server's resources
"""
from distributed import LocalCluster, performance_report
cluster = LocalCluster()
cluster.scale(6)
return clusterPython logic for a Local Cluster¶
This uses True/False boolean logic based on the variables set in the previous cells
# Obtain dask cluster in one of three ways
if USE_PBS_SCHEDULER:
cluster = get_pbs_cluster()
elif USE_DASK_GATEWAY:
cluster = get_gateway_cluster()
else:
cluster = get_local_cluster()
# Connect to cluster
from distributed import Client
client = Client(cluster)/glade/u/home/harshah/.conda/envs/osdf/lib/python3.11/site-packages/distributed/node.py:188: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 43085 instead
warnings.warn(
# Scale the cluster and display cluster dashboard URL
cluster.scale(8)
clusterSection 3: Data Loading¶
Load catalog and select data subset
col = intake.open_esm_datastore(cat_url)
col[eid for eid in col.df['experiment_id'].unique() if 'ssp' in eid]['esm-ssp585-ssp126Lu',
'ssp126-ssp370Lu',
'ssp370-ssp126Lu',
'ssp585',
'ssp245',
'ssp370-lowNTCF',
'ssp370SST-ssp126Lu',
'ssp370SST',
'ssp370pdSST',
'ssp370SST-lowCH4',
'ssp370SST-lowNTCF',
'ssp126',
'ssp119',
'ssp370',
'esm-ssp585',
'ssp245-nat',
'ssp245-GHG',
'ssp460',
'ssp434',
'ssp534-over',
'ssp245-aer',
'ssp245-stratO3',
'ssp245-cov-fossil',
'ssp245-cov-modgreen',
'ssp245-cov-strgreen',
'ssp245-covid',
'ssp585-bgc']# there is currently a significant amount of data for these runs
expts = ['historical', 'ssp245', 'ssp370']
query = dict(
experiment_id=expts,
table_id='Amon',
variable_id=['tas'],
member_id = 'r1i1p1f1',
#activity_id = 'CMIP',
)
col_subset = col.search(require_all_on=["source_id"], **query)
col_subsetcol_subset.df.groupby("source_id")[["experiment_id", "variable_id", "table_id","activity_id"]].nunique()col_subset.df# %%time
# dsets_osdf = col_subset.to_dataset_dict()
# print(f"\nDataset dictionary keys:\n {dsets_osdf.keys()}")%%time
osdf_fs = OSDFFileSystem()
test_url = '/ncar-gdex/d010096/cmip6-pds/CMIP6/CMIP/CSIRO-ARCCSS/ACCESS-CM2/historical/r1i1p1f1/Amon/tas/gn/v20191108/'
#
# test_ds = xr.open_dataset(osdf_fs.open(test_url,mode='rb'),engine='zarr')
test_ds = xr.open_zarr('osdf:///ncar-gdex/d010096/cmip6-pds/CMIP6/CMIP/CSIRO-ARCCSS/ACCESS-CM2/historical/r1i1p1f1/Amon/tas/gn/v20191108/')
test_dsCPU times: user 78.7 ms, sys: 1.79 ms, total: 80.5 ms
Wall time: 2.82 s
test = xr.open_zarr('https://osdf-director.osg-htc.org/ncar-gdex/d010096/AS-RCEC/')---------------------------------------------------------------------------
GroupNotFoundError Traceback (most recent call last)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/zarr.py:1868, in _get_open_params(store, mode, synchronizer, group, consolidated, consolidate_on_close, chunk_store, storage_options, zarr_version, use_zarr_fill_value_as_mask, zarr_format)
1867 try:
-> 1868 zarr_root_group = zarr.open_consolidated(store, **open_kwargs)
1869 except (ValueError, KeyError):
1870 # ValueError in zarr-python 3.x, KeyError in 2.x.
File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/api/synchronous.py:238, in open_consolidated(use_consolidated, *args, **kwargs)
234 """
235 Alias for [`open_group`][zarr.api.synchronous.open_group] with ``use_consolidated=True``.
236 """
237 return Group(
--> 238 sync(async_api.open_consolidated(*args, use_consolidated=use_consolidated, **kwargs))
239 )
File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/core/sync.py:159, in sync(coro, loop, timeout)
158 if isinstance(return_result, BaseException):
--> 159 raise return_result
160 else:
File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/core/sync.py:119, in _runner(coro)
118 try:
--> 119 return await coro
120 except Exception as ex:
File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/api/asynchronous.py:415, in open_consolidated(use_consolidated, *args, **kwargs)
411 raise TypeError(
412 "'use_consolidated' must be 'True' in 'open_consolidated'. Use 'open' with "
413 "'use_consolidated=False' to bypass consolidated metadata."
414 )
--> 415 return await open_group(*args, use_consolidated=use_consolidated, **kwargs)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/api/asynchronous.py:881, in open_group(store, mode, cache_attrs, synchronizer, path, chunk_store, storage_options, zarr_version, zarr_format, meta_array, attributes, use_consolidated)
880 msg = f"No group found in store {store!r} at path {store_path.path!r}"
--> 881 raise GroupNotFoundError(msg)
GroupNotFoundError: No group found in store 'https://osdf-director.osg-htc.org/ncar-gdex/d010096/AS-RCEC/' at path ''
During handling of the above exception, another exception occurred:
GroupNotFoundError Traceback (most recent call last)
Cell In[28], line 1
----> 1 test = xr.open_zarr('https://osdf-director.osg-htc.org/ncar-gdex/d010096/AS-RCEC/')
File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/zarr.py:1586, in open_zarr(store, group, synchronizer, chunks, decode_cf, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, consolidated, overwrite_encoded_chunks, chunk_store, storage_options, decode_timedelta, use_cftime, zarr_version, zarr_format, use_zarr_fill_value_as_mask, chunked_array_type, from_array_kwargs, create_default_indexes, **kwargs)
1572 raise TypeError(
1573 "open_zarr() got unexpected keyword arguments " + ",".join(kwargs.keys())
1574 )
1576 backend_kwargs = {
1577 "synchronizer": synchronizer,
1578 "consolidated": consolidated,
(...) 1583 "zarr_format": zarr_format,
1584 }
-> 1586 ds = open_dataset(
1587 filename_or_obj=store,
1588 group=group,
1589 decode_cf=decode_cf,
1590 mask_and_scale=mask_and_scale,
1591 decode_times=decode_times,
1592 concat_characters=concat_characters,
1593 decode_coords=decode_coords,
1594 engine="zarr",
1595 chunks=chunks,
1596 drop_variables=drop_variables,
1597 create_default_indexes=create_default_indexes,
1598 chunked_array_type=chunked_array_type,
1599 from_array_kwargs=from_array_kwargs,
1600 backend_kwargs=backend_kwargs,
1601 decode_timedelta=decode_timedelta,
1602 use_cftime=use_cftime,
1603 zarr_version=zarr_version,
1604 use_zarr_fill_value_as_mask=use_zarr_fill_value_as_mask,
1605 )
1606 return ds
File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/api.py:606, in open_dataset(filename_or_obj, engine, chunks, cache, decode_cf, mask_and_scale, decode_times, decode_timedelta, use_cftime, concat_characters, decode_coords, drop_variables, create_default_indexes, inline_array, chunked_array_type, from_array_kwargs, backend_kwargs, **kwargs)
594 decoders = _resolve_decoders_kwargs(
595 decode_cf,
596 open_backend_dataset_parameters=backend.open_dataset_parameters,
(...) 602 decode_coords=decode_coords,
603 )
605 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 606 backend_ds = backend.open_dataset(
607 filename_or_obj,
608 drop_variables=drop_variables,
609 **decoders,
610 **kwargs,
611 )
612 ds = _dataset_from_backend_dataset(
613 backend_ds,
614 filename_or_obj,
(...) 625 **kwargs,
626 )
627 return ds
File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/zarr.py:1660, in ZarrBackendEntrypoint.open_dataset(self, filename_or_obj, mask_and_scale, decode_times, concat_characters, decode_coords, drop_variables, use_cftime, decode_timedelta, group, mode, synchronizer, consolidated, chunk_store, storage_options, zarr_version, zarr_format, store, engine, use_zarr_fill_value_as_mask, cache_members)
1658 filename_or_obj = _normalize_path(filename_or_obj)
1659 if not store:
-> 1660 store = ZarrStore.open_group(
1661 filename_or_obj,
1662 group=group,
1663 mode=mode,
1664 synchronizer=synchronizer,
1665 consolidated=consolidated,
1666 consolidate_on_close=False,
1667 chunk_store=chunk_store,
1668 storage_options=storage_options,
1669 zarr_version=zarr_version,
1670 use_zarr_fill_value_as_mask=None,
1671 zarr_format=zarr_format,
1672 cache_members=cache_members,
1673 )
1675 store_entrypoint = StoreBackendEntrypoint()
1676 with close_on_error(store):
File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/zarr.py:714, in ZarrStore.open_group(cls, store, mode, synchronizer, group, consolidated, consolidate_on_close, chunk_store, storage_options, append_dim, write_region, safe_chunks, align_chunks, zarr_version, zarr_format, use_zarr_fill_value_as_mask, write_empty, cache_members)
688 @classmethod
689 def open_group(
690 cls,
(...) 707 cache_members: bool = True,
708 ):
709 (
710 zarr_group,
711 consolidate_on_close,
712 close_store_on_close,
713 use_zarr_fill_value_as_mask,
--> 714 ) = _get_open_params(
715 store=store,
716 mode=mode,
717 synchronizer=synchronizer,
718 group=group,
719 consolidated=consolidated,
720 consolidate_on_close=consolidate_on_close,
721 chunk_store=chunk_store,
722 storage_options=storage_options,
723 zarr_version=zarr_version,
724 use_zarr_fill_value_as_mask=use_zarr_fill_value_as_mask,
725 zarr_format=zarr_format,
726 )
728 return cls(
729 zarr_group,
730 mode,
(...) 739 cache_members=cache_members,
740 )
File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/zarr.py:1872, in _get_open_params(store, mode, synchronizer, group, consolidated, consolidate_on_close, chunk_store, storage_options, zarr_version, use_zarr_fill_value_as_mask, zarr_format)
1869 except (ValueError, KeyError):
1870 # ValueError in zarr-python 3.x, KeyError in 2.x.
1871 try:
-> 1872 zarr_root_group = zarr.open_group(store, **open_kwargs)
1873 emit_user_level_warning(
1874 "Failed to open Zarr store with consolidated metadata, "
1875 "but successfully read with non-consolidated metadata. "
(...) 1885 RuntimeWarning,
1886 )
1887 except missing_exc as err:
File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/api/synchronous.py:549, in open_group(store, mode, cache_attrs, synchronizer, path, chunk_store, storage_options, zarr_version, zarr_format, meta_array, attributes, use_consolidated)
478 def open_group(
479 store: StoreLike | None = None,
480 *,
(...) 491 use_consolidated: bool | str | None = None,
492 ) -> Group:
493 """Open a group using file-mode-like semantics.
494
495 Parameters
(...) 546 The new group.
547 """
548 return Group(
--> 549 sync(
550 async_api.open_group(
551 store=store,
552 mode=mode,
553 cache_attrs=cache_attrs,
554 synchronizer=synchronizer,
555 path=path,
556 chunk_store=chunk_store,
557 storage_options=storage_options,
558 zarr_version=zarr_version,
559 zarr_format=zarr_format,
560 meta_array=meta_array,
561 attributes=attributes,
562 use_consolidated=use_consolidated,
563 )
564 )
565 )
File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/core/sync.py:159, in sync(coro, loop, timeout)
156 return_result = next(iter(finished)).result()
158 if isinstance(return_result, BaseException):
--> 159 raise return_result
160 else:
161 return return_result
File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/core/sync.py:119, in _runner(coro)
114 """
115 Await a coroutine and return the result of running it. If awaiting the coroutine raises an
116 exception, the exception will be returned.
117 """
118 try:
--> 119 return await coro
120 except Exception as ex:
121 return ex
File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/api/asynchronous.py:881, in open_group(store, mode, cache_attrs, synchronizer, path, chunk_store, storage_options, zarr_version, zarr_format, meta_array, attributes, use_consolidated)
874 return await AsyncGroup.from_store(
875 store_path,
876 zarr_format=_zarr_format,
877 overwrite=overwrite,
878 attributes=attributes,
879 )
880 msg = f"No group found in store {store!r} at path {store_path.path!r}"
--> 881 raise GroupNotFoundError(msg)
GroupNotFoundError: No group found in store 'https://osdf-director.osg-htc.org/ncar-gdex/d010096/AS-RCEC/' at path ''def drop_all_bounds(ds):
drop_vars = [vname for vname in ds.coords
if (('_bounds') in vname ) or ('_bnds') in vname]
return ds.drop_vars(drop_vars)
# def open_dset(df):
# assert len(df) == 1
# mapper = fsspec.get_mapper(df.zstore.values[0])
# path = df.zstore.values[0][7:]+".zmetadata"
# ds = xr.open_zarr(mapper, consolidated=True, zarr_format=2) #Specify version of the zarr store
# a_data = mapper.fs.get_access_data()
# responses = a_data.get_responses(path)
# print(responses)
# return drop_all_bounds(ds)
def open_dset(df):
assert len(df) == 1
mapper = fsspec.get_mapper(df.zstore.values[0])
#path = df.zstore.values[0][7:]+".zmetadata"
ds = xr.open_zarr(mapper, consolidated=True,zarr_format=2)
return drop_all_bounds(ds)
def open_delayed(df):
return dask.delayed(open_dset)(df)
from collections import defaultdict
dsets = defaultdict(dict)
for group, df in col_subset.df.groupby(by=['source_id', 'experiment_id']):
dsets[group[0]][group[1]] = open_delayed(df)dsets_ = dask.compute(dict(dsets))[0]---------------------------------------------------------------------------
FileNotFoundError Traceback (most recent call last)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/pelicanfs/core.py:1155, in wrapper()
1154 logger.debug(f"Calling {func} using the following url: {data_url}")
-> 1155 result = await func(self, data_url, *args[1:], **kwargs)
1156 except Exception as e:
File ~/.conda/envs/osdf/lib/python3.11/site-packages/pelicanfs/core.py:1236, in _cat_file()
1234 @_cache_dec
1235 async def _cat_file(self, path, start=None, end=None, **kwargs):
-> 1236 return await self.http_file_system._cat_file(path, start, end, **kwargs)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/fsspec/implementations/http.py:247, in _cat_file()
246 out = await r.read()
--> 247 self._raise_not_found_for_status(r, url)
248 return out
File ~/.conda/envs/osdf/lib/python3.11/site-packages/fsspec/implementations/http.py:229, in _raise_not_found_for_status()
228 if response.status == 404:
--> 229 raise FileNotFoundError(url)
230 response.raise_for_status()
FileNotFoundError: https://ncar-cache.nationalresearchplatform.org:8443/ncar-gdex/d010096/cmip6-pds/CMIP6/CMIP/NOAA-GFDL/GFDL-ESM4/historical/r1i1p1f1/Amon/tas/gr1/v20190726/.zgroup
During handling of the above exception, another exception occurred:
ValueError Traceback (most recent call last)
Cell In[18], line 1
----> 1 dsets_ = dask.compute(dict(dsets))[0]
File ~/.conda/envs/osdf/lib/python3.11/site-packages/dask/base.py:685, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
682 expr = expr.optimize()
683 keys = list(flatten(expr.__dask_keys__()))
--> 685 results = schedule(expr, keys, **kwargs)
687 return repack(results)
Cell In[17], line 20, in open_dset()
18 mapper = fsspec.get_mapper(df.zstore.values[0])
19 #path = df.zstore.values[0][7:]+".zmetadata"
---> 20 ds = xr.open_zarr(mapper, consolidated=True,zarr_format=2)
21 return drop_all_bounds(ds)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/zarr.py:1586, in open_zarr()
1572 raise TypeError(
1573 "open_zarr() got unexpected keyword arguments " + ",".join(kwargs.keys())
1574 )
1576 backend_kwargs = {
1577 "synchronizer": synchronizer,
1578 "consolidated": consolidated,
(...) 1583 "zarr_format": zarr_format,
1584 }
-> 1586 ds = open_dataset(
1587 filename_or_obj=store,
1588 group=group,
1589 decode_cf=decode_cf,
1590 mask_and_scale=mask_and_scale,
1591 decode_times=decode_times,
1592 concat_characters=concat_characters,
1593 decode_coords=decode_coords,
1594 engine="zarr",
1595 chunks=chunks,
1596 drop_variables=drop_variables,
1597 create_default_indexes=create_default_indexes,
1598 chunked_array_type=chunked_array_type,
1599 from_array_kwargs=from_array_kwargs,
1600 backend_kwargs=backend_kwargs,
1601 decode_timedelta=decode_timedelta,
1602 use_cftime=use_cftime,
1603 zarr_version=zarr_version,
1604 use_zarr_fill_value_as_mask=use_zarr_fill_value_as_mask,
1605 )
1606 return ds
File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/api.py:606, in open_dataset()
594 decoders = _resolve_decoders_kwargs(
595 decode_cf,
596 open_backend_dataset_parameters=backend.open_dataset_parameters,
(...) 602 decode_coords=decode_coords,
603 )
605 overwrite_encoded_chunks = kwargs.pop("overwrite_encoded_chunks", None)
--> 606 backend_ds = backend.open_dataset(
607 filename_or_obj,
608 drop_variables=drop_variables,
609 **decoders,
610 **kwargs,
611 )
612 ds = _dataset_from_backend_dataset(
613 backend_ds,
614 filename_or_obj,
(...) 625 **kwargs,
626 )
627 return ds
File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/zarr.py:1660, in open_dataset()
1658 filename_or_obj = _normalize_path(filename_or_obj)
1659 if not store:
-> 1660 store = ZarrStore.open_group(
1661 filename_or_obj,
1662 group=group,
1663 mode=mode,
1664 synchronizer=synchronizer,
1665 consolidated=consolidated,
1666 consolidate_on_close=False,
1667 chunk_store=chunk_store,
1668 storage_options=storage_options,
1669 zarr_version=zarr_version,
1670 use_zarr_fill_value_as_mask=None,
1671 zarr_format=zarr_format,
1672 cache_members=cache_members,
1673 )
1675 store_entrypoint = StoreBackendEntrypoint()
1676 with close_on_error(store):
File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/zarr.py:714, in open_group()
688 @classmethod
689 def open_group(
690 cls,
(...) 707 cache_members: bool = True,
708 ):
709 (
710 zarr_group,
711 consolidate_on_close,
712 close_store_on_close,
713 use_zarr_fill_value_as_mask,
--> 714 ) = _get_open_params(
715 store=store,
716 mode=mode,
717 synchronizer=synchronizer,
718 group=group,
719 consolidated=consolidated,
720 consolidate_on_close=consolidate_on_close,
721 chunk_store=chunk_store,
722 storage_options=storage_options,
723 zarr_version=zarr_version,
724 use_zarr_fill_value_as_mask=use_zarr_fill_value_as_mask,
725 zarr_format=zarr_format,
726 )
728 return cls(
729 zarr_group,
730 mode,
(...) 739 cache_members=cache_members,
740 )
File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/backends/zarr.py:1864, in _get_open_params()
1860 group = open_kwargs.pop("path")
1862 if consolidated:
1863 # TODO: an option to pass the metadata_key keyword
-> 1864 zarr_root_group = zarr.open_consolidated(store, **open_kwargs)
1865 elif consolidated is None:
1866 # same but with more error handling in case no consolidated metadata found
1867 try:
File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/api/synchronous.py:238, in open_consolidated()
233 def open_consolidated(*args: Any, use_consolidated: Literal[True] = True, **kwargs: Any) -> Group:
234 """
235 Alias for [`open_group`][zarr.api.synchronous.open_group] with ``use_consolidated=True``.
236 """
237 return Group(
--> 238 sync(async_api.open_consolidated(*args, use_consolidated=use_consolidated, **kwargs))
239 )
File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/core/sync.py:159, in sync()
156 return_result = next(iter(finished)).result()
158 if isinstance(return_result, BaseException):
--> 159 raise return_result
160 else:
161 return return_result
File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/core/sync.py:119, in _runner()
114 """
115 Await a coroutine and return the result of running it. If awaiting the coroutine raises an
116 exception, the exception will be returned.
117 """
118 try:
--> 119 return await coro
120 except Exception as ex:
121 return ex
File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/api/asynchronous.py:415, in open_consolidated()
410 if use_consolidated is not True:
411 raise TypeError(
412 "'use_consolidated' must be 'True' in 'open_consolidated'. Use 'open' with "
413 "'use_consolidated=False' to bypass consolidated metadata."
414 )
--> 415 return await open_group(*args, use_consolidated=use_consolidated, **kwargs)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/api/asynchronous.py:866, in open_group()
864 try:
865 if mode in _READ_MODES:
--> 866 return await AsyncGroup.open(
867 store_path, zarr_format=zarr_format, use_consolidated=use_consolidated
868 )
869 except (KeyError, FileNotFoundError):
870 pass
File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/core/group.py:548, in open()
545 if use_consolidated or use_consolidated is None:
546 paths.append(store_path / consolidated_key)
--> 548 zgroup_bytes, zattrs_bytes, *rest = await asyncio.gather(
549 *[path.get() for path in paths]
550 )
551 if zgroup_bytes is None:
552 raise FileNotFoundError(store_path)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/storage/_common.py:168, in get()
166 if prototype is None:
167 prototype = default_buffer_prototype()
--> 168 return await self.store.get(self.path, prototype=prototype, byte_range=byte_range)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/zarr/storage/_fsspec.py:289, in get()
287 try:
288 if byte_range is None:
--> 289 value = prototype.buffer.from_bytes(await self.fs._cat_file(path))
290 elif isinstance(byte_range, RangeByteRequest):
291 value = prototype.buffer.from_bytes(
292 await self.fs._cat_file(
293 path,
(...) 296 )
297 )
File ~/.conda/envs/osdf/lib/python3.11/site-packages/pelicanfs/core.py:1158, in wrapper()
1156 except Exception as e:
1157 if not self.direct_reads:
-> 1158 self._bad_cache(data_url, e)
1159 raise
1160 if not self.direct_reads:
File ~/.conda/envs/osdf/lib/python3.11/site-packages/pelicanfs/core.py:740, in _bad_cache()
738 if not namespace_info:
739 return
--> 740 namespace_info.cache_manager.bad_cache(bad_cache)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/pelicanfs/core.py:178, in bad_cache()
176 bad_cache_url = cache_url_parsed.geturl()
177 with self._lock:
--> 178 self._cache_list.remove(bad_cache_url)
ValueError: list.remove(x): x not in list#calculate global means
def get_lat_name(ds):
for lat_name in ['lat', 'latitude']:
if lat_name in ds.coords:
return lat_name
raise RuntimeError("Couldn't find a latitude coordinate")
def global_mean(ds):
lat = ds[get_lat_name(ds)]
weight = np.cos(np.deg2rad(lat))
weight /= weight.mean()
other_dims = set(ds.dims) - {'time'}
return (ds * weight).mean(other_dims)expt_da = xr.DataArray(expts, dims='experiment_id', name='experiment_id',
coords={'experiment_id': expts})
dsets_aligned = {}
for k, v in tqdm(dsets_.items()):
expt_dsets = v.values()
if any([d is None for d in expt_dsets]):
print(f"Missing experiment for {k}")
continue
for ds in expt_dsets:
ds.coords['year'] = ds.time.dt.year
# workaround for
# https://github.com/pydata/xarray/issues/2237#issuecomment-620961663
dsets_ann_mean = [v[expt].pipe(global_mean).swap_dims({'time': 'year'})
.drop_vars('time').coarsen(year=12).mean()
for expt in expts]
# align everything with the 4xCO2 experiment
dsets_aligned[k] = xr.concat(dsets_ann_mean, join='outer',dim=expt_da)%%time
with progress.ProgressBar():
dsets_aligned_ = dask.compute(dsets_aligned)[0]source_ids = list(dsets_aligned_.keys())
source_da = xr.DataArray(source_ids, dims='source_id', name='source_id',
coords={'source_id': source_ids})
big_ds = xr.concat([ds.reset_coords(drop=True)
for ds in dsets_aligned_.values()],
dim=source_da)
big_dsObservational time series data for comparison with ensemble spread¶
The observational data we will use is the HadCRUT5 dataset from the UK Met Office
The data has been downloaded to NCAR’s Geoscience Data Exchange (GDEX) from https://
www .metoffice .gov .uk /hadobs /hadcrut5/ We will use an OSDF file system to access this copy from the GDEX
osdf_fs = OSDFFileSystem()
print(osdf_fs)
#
obs_url = '/ncar/gdex/d850001/HadCRUT.5.0.2.0.analysis.summary_series.global.monthly.nc'
#
obs_ds = xr.open_dataset(osdf_fs.open(obs_url,mode='rb'),engine='h5netcdf').tas_mean
obs_ds# obs_url = 'osdf:///ncar/gdex/d850001/HadCRUT.5.0.2.0.analysis.summary_series.global.monthly.zarr'
# print(obs_url)
# #
# obs_ds = xr.open_zarr(obs_url).tas_mean
# # obs_ds# Compute annual mean temperatures anomalies
obs_gmsta = obs_ds.resample(time='YS').mean(dim='time')
obs_gmstaSection 4: Data Analysis¶
Calculate Global Mean Surface Temperature Anomaly (GMSTA)
Grab some observations/ ERA5 reanalysis data
Convert xarray datasets to dataframes
Use Seaborn to plot GMSTA
df_all = big_ds.to_dataframe().reset_index()
df_all.head()# Compute anomaly w.r.t 1960-1990 baseline
# Define the baseline period
baseline_df = df_all[(df_all["year"] >= 1960) & (df_all["year"] <= 1990)]
# Compute the baseline mean
baseline_mean = baseline_df["tas"].mean()
# Compute anomalies
df_all["tas_anomaly"] = df_all["tas"] - baseline_mean
df_allobs_df = obs_gmsta.to_dataframe(name='tas_anomaly').reset_index()
# obs_df# Convert 'time' to 'year' (keeping only the year)
obs_df['year'] = obs_df['time'].dt.year
# Drop the original 'time' column since we extracted 'year'
obs_df = obs_df[['year', 'tas_anomaly']]
obs_df# Create the main plot
g = sns.relplot(data=df_all, x="year", y="tas_anomaly",
hue='experiment_id', kind="line", errorbar="sd", aspect=2, palette="Set2") # Adjust the color palette)
# Get the current axis from the FacetGrid
ax = g.ax
# Overlay the observational data in red
sns.lineplot(data=obs_df, x="year", y="tas_anomaly",color="red",
linestyle="dashed", linewidth=2,label="Observations", ax=ax)
# Adjust the legend to include observations
ax.legend(title="Experiment ID + Observations")
# Show the plot
plt.show()cluster.close()