Calculate Global Mean of Surface Temperatures from JRA-3Q reanalysis dataset hosted on NCAR’s GDEX¶
import glob
import re
import matplotlib.pyplot as plt
import numpy as np
import scipy as sp
import xarray as xr
import intake
import intake_esm
import pandas as pd
import os# Set up your sratch folder path
username = os.environ["USER"]
glade_scratch = "/glade/derecho/scratch/" + username
print(glade_scratch)
#
jra3q_catalog_url = 'https://data.gdex.ucar.edu/d640000/catalogs/d640000-osdf.json'
# jra3q_catalog_url = 'https://osdf-director.osg-htc.org/ncar/gdex/d640000/catalogs/d640000-https.json'/glade/derecho/scratch/harshah
# GMST function ###
# calculate global means
def get_lat_name(ds):
for lat_name in ['lat', 'latitude']:
if lat_name in ds.coords:
return lat_name
raise RuntimeError("Couldn't find a latitude coordinate")
def global_mean(ds):
lat = ds[get_lat_name(ds)]
weight = np.cos(np.deg2rad(lat))
weight /= weight.mean()
other_dims = set(ds.dims) - {'time'}
return (ds * weight).mean(other_dims)USE_PBS_SCHEDULER = True# Create a PBS cluster object
def get_pbs_cluster():
""" Create cluster through dask_jobqueue.
"""
from dask_jobqueue import PBSCluster
cluster = PBSCluster(
job_name = 'dask-osdf-24',
cores = 1,
memory = '4GiB',
processes = 1,
local_directory = glade_scratch + '/dask/spill',
log_directory = glade_scratch + '/dask/logs/',
resource_spec = 'select=1:ncpus=1:mem=4GB',
queue = 'casper',
walltime = '3:00:00',
#interface = 'ib0'
interface = 'ext'
)
return cluster
def get_gateway_cluster():
""" Create cluster through dask_gateway
"""
from dask_gateway import Gateway
gateway = Gateway()
cluster = gateway.new_cluster()
cluster.adapt(minimum=2, maximum=4)
return cluster
def get_local_cluster():
""" Create cluster using the Jupyter server's resources
"""
from distributed import LocalCluster, performance_report
cluster = LocalCluster()
cluster.scale(6)
return cluster# Obtain dask cluster in one of three ways
if USE_PBS_SCHEDULER:
cluster = get_pbs_cluster()
elif USE_DASK_GATEWAY:
cluster = get_gateway_cluster()
else:
cluster = get_local_cluster()
# Connect to cluster
from distributed import Client
client = Client(cluster)/glade/u/home/harshah/.conda/envs/osdf/lib/python3.11/site-packages/distributed/node.py:188: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 33661 instead
warnings.warn(
2026-04-06 16:17:55,176 - distributed.scheduler - ERROR - Error during deserialization of the task graph. This frequently
occurs if the Scheduler and Client have different environments.
For more information, see
https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments
# Scale the cluster and display cluster dashboard URL
n_workers =5
cluster.scale(n_workers)
client.wait_for_workers(n_workers = n_workers)
clusterLoading...
jra3q_cat = intake.open_esm_datastore(jra3q_catalog_url)
jra3q_catLoading...
jra3q_cat.df.head()Loading...
jra3q_unique_vars= jra3q_cat.df[['variable','long_name']].drop_duplicates()
jra3q_unique_varsLoading...
jra3q_unique_vars[1:100]Loading...
Filter variables that start with temp¶
temp_cat = jra3q_cat.search(long_name='temp*')
temp_cat.dfLoading...
Let us pick the 2m max air temperature and compute GMST
Currently intake-esm does not support the kerchunk engine. So, let us download a PR that does
jra3q_temps = jra3q_cat.search(variable='tmp2m-hgt-an-gauss')
jra3q_datasets = jra3q_temps.to_dataset_dict(xarray_open_kwargs={'engine':'kerchunk',"chunks": {}})
--> The keys in the returned dictionary of datasets are constructed as follows:
'variable.short_name'
Loading...
Loading...
jra3q_datasets.keys()dict_keys(['tmp2m-hgt-an-gauss.tmp2m-hgt-an-gauss'])ds = jra3q_datasets['tmp2m-hgt-an-gauss.tmp2m-hgt-an-gauss']
ds['tmp2m-hgt-an-gauss'].isel(time=0).plot(cmap='coolwarm')
%%time
# a quick calculation of global mean surface temperature hourly time series
da_tmp2m = ds['tmp2m-hgt-an-gauss'].sel(time=slice('2025-01-01','2025-09-25')).mean(dim=['lat','lon']).compute()CPU times: user 36.8 s, sys: 1.66 s, total: 38.5 s
Wall time: 8min 17s
---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/aiohttp/client_reqrep.py:539, in start()
538 protocol = self._protocol
--> 539 message, payload = await protocol.read() # type: ignore[union-attr]
540 except http.HttpProcessingError as exc:
File ~/.conda/envs/osdf/lib/python3.11/site-packages/aiohttp/streams.py:703, in read()
702 try:
--> 703 await self._waiter
704 except (asyncio.CancelledError, asyncio.TimeoutError):
CancelledError:
The above exception was the direct cause of the following exception:
TimeoutError Traceback (most recent call last)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/fsspec/asyn.py:56, in _runner()
55 try:
---> 56 result[0] = await coro
57 except Exception as ex:
File ~/.conda/envs/osdf/lib/python3.11/site-packages/fsspec/implementations/http.py:245, in _cat_file()
244 session = await self.set_session()
--> 245 async with session.get(self.encode_url(url), **kw) as r:
246 out = await r.read()
File ~/.conda/envs/osdf/lib/python3.11/site-packages/aiohttp/client.py:1510, in __aenter__()
1509 async def __aenter__(self) -> _RetType:
-> 1510 self._resp: _RetType = await self._coro
1511 return await self._resp.__aenter__()
File ~/.conda/envs/osdf/lib/python3.11/site-packages/aiohttp/client.py:779, in _request()
778 try:
--> 779 resp = await handler(req)
780 # Client connector errors should not be retried
File ~/.conda/envs/osdf/lib/python3.11/site-packages/aiohttp/client.py:757, in _connect_and_send_request()
756 try:
--> 757 await resp.start(conn)
758 except BaseException:
File ~/.conda/envs/osdf/lib/python3.11/site-packages/aiohttp/client_reqrep.py:534, in start()
532 self._connection = connection
--> 534 with self._timer:
535 while True:
536 # read response
File ~/.conda/envs/osdf/lib/python3.11/site-packages/aiohttp/helpers.py:713, in __exit__()
712 return None
--> 713 raise asyncio.TimeoutError from exc_val
714 return None
TimeoutError:
The above exception was the direct cause of the following exception:
FSTimeoutError Traceback (most recent call last)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/distributed/scheduler.py:4873, in update_graph()
4872 try:
-> 4873 expr = deserialize(expr_ser.header, expr_ser.frames)
4874 del expr_ser
File ~/.conda/envs/osdf/lib/python3.11/site-packages/distributed/protocol/serialize.py:452, in deserialize()
451 dumps, loads, wants_context = families[name]
--> 452 return loads(header, frames)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/distributed/protocol/serialize.py:111, in pickle_loads()
106 buffers = [
107 ensure_writeable_flag(ensure_memoryview(mv), w)
108 for mv, w in zip(buffers, header["writeable"])
109 ]
--> 111 return pickle.loads(pik, buffers=buffers)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/distributed/protocol/pickle.py:95, in loads()
94 else:
---> 95 return pickle.loads(x)
96 except Exception:
File ~/.conda/envs/osdf/lib/python3.11/site-packages/fsspec/spec.py:33, in make_instance()
32 def make_instance(cls, args, kwargs):
---> 33 return cls(*args, **kwargs)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/fsspec/spec.py:84, in __call__()
83 else:
---> 84 obj = super().__call__(*args, **kwargs, **strip_tokenize_options)
85 # Setting _fs_token here causes some static linters to complain.
File ~/.conda/envs/osdf/lib/python3.11/site-packages/fsspec/implementations/reference.py:755, in __init__()
752 if remote_protocol is None:
753 # get single protocol from references
754 # TODO: warning here, since this can be very expensive?
--> 755 for ref in self.references.values():
756 if callable(ref):
File ~/.conda/envs/osdf/lib/python3.11/site-packages/fsspec/implementations/reference.py:74, in __iter__()
73 continue
---> 74 yield from self._mapping._generate_all_records(field)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/fsspec/implementations/reference.py:378, in _generate_all_records()
377 for record in range(nrec):
--> 378 yield from self._generate_record(field, record)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/fsspec/implementations/reference.py:359, in _generate_record()
358 """The references for a given parquet file of a given field"""
--> 359 refs = self.open_refs(field, record)
360 it = iter(zip(*refs.values()))
File ~/.conda/envs/osdf/lib/python3.11/site-packages/fsspec/implementations/reference.py:181, in open_refs()
180 path = self.url.format(field=field, record=record)
--> 181 data = io.BytesIO(self.fs.cat_file(path))
182 try:
File ~/.conda/envs/osdf/lib/python3.11/site-packages/fsspec/asyn.py:118, in wrapper()
117 self = obj or args[0]
--> 118 return sync(self.loop, func, *args, **kwargs)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/fsspec/asyn.py:101, in sync()
99 if isinstance(return_result, asyncio.TimeoutError):
100 # suppress asyncio.TimeoutError, raise FSTimeoutError
--> 101 raise FSTimeoutError from return_result
102 elif isinstance(return_result, BaseException):
FSTimeoutError:
The above exception was the direct cause of the following exception:
RuntimeError Traceback (most recent call last)
Cell In[16], line 1
----> 1 get_ipython().run_cell_magic('time', '', "# a quick calculation of global mean surface temperature hourly time series\nda_tmp2m = ds['tmp2m-hgt-an-gauss'].sel(time=slice('2025-01-01','2025-09-25')).mean(dim=['lat','lon']).compute()\n")
File ~/.conda/envs/osdf/lib/python3.11/site-packages/IPython/core/interactiveshell.py:2572, in InteractiveShell.run_cell_magic(self, magic_name, line, cell)
2570 with self.builtin_trap:
2571 args = (magic_arg_s, cell)
-> 2572 result = fn(*args, **kwargs)
2574 # The code below prevents the output from being displayed
2575 # when using magics with decorator @output_can_be_silenced
2576 # when the last Python token in the expression is a ';'.
2577 if getattr(fn, magic.MAGIC_OUTPUT_CAN_BE_SILENCED, False):
File ~/.conda/envs/osdf/lib/python3.11/site-packages/IPython/core/magics/execution.py:1447, in ExecutionMagics.time(self, line, cell, local_ns)
1445 if interrupt_occured:
1446 if exit_on_interrupt and captured_exception:
-> 1447 raise captured_exception
1448 return
1449 return out
File ~/.conda/envs/osdf/lib/python3.11/site-packages/IPython/core/magics/execution.py:1411, in ExecutionMagics.time(self, line, cell, local_ns)
1409 st = clock2()
1410 try:
-> 1411 exec(code, glob, local_ns)
1412 out = None
1413 # multi-line %%time case
File <timed exec>:2
File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/core/dataarray.py:1242, in DataArray.compute(self, **kwargs)
1212 """Trigger loading data into memory and return a new dataarray.
1213
1214 Data will be computed and/or loaded from disk or a remote source.
(...) 1239 Variable.compute
1240 """
1241 new = self.copy(deep=False)
-> 1242 return new.load(**kwargs)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/core/dataarray.py:1168, in DataArray.load(self, **kwargs)
1138 def load(self, **kwargs) -> Self:
1139 """Trigger loading data into memory and return this dataarray.
1140
1141 Data will be computed and/or loaded from disk or a remote source.
(...) 1166 Variable.load
1167 """
-> 1168 ds = self._to_temp_dataset().load(**kwargs)
1169 new = self._from_temp_dataset(ds)
1170 self._variable = new._variable
File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/core/dataset.py:558, in Dataset.load(self, **kwargs)
555 chunkmanager = get_chunked_array_type(*chunked_data.values())
557 # evaluate all the chunked arrays simultaneously
--> 558 evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute(
559 *chunked_data.values(), **kwargs
560 )
562 for k, data in zip(chunked_data, evaluated_data, strict=False):
563 self.variables[k].data = data
File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/namedarray/daskmanager.py:85, in DaskManager.compute(self, *data, **kwargs)
80 def compute(
81 self, *data: Any, **kwargs: Any
82 ) -> tuple[np.ndarray[Any, _DType_co], ...]:
83 from dask.array import compute
---> 85 return compute(*data, **kwargs)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/dask/base.py:685, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
682 expr = expr.optimize()
683 keys = list(flatten(expr.__dask_keys__()))
--> 685 results = schedule(expr, keys, **kwargs)
687 return repack(results)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/xarray/core/indexing.py:670, in __array__()
666 def __array__(
667 self, dtype: DTypeLike | None = None, /, *, copy: bool | None = None
668 ) -> np.ndarray:
669 if Version(np.__version__) >= Version("2.0.0"):
--> 670 return np.asarray(self.get_duck_array(), dtype=dtype, copy=copy)
671 else:
672 return np.asarray(self.get_duck_array(), dtype=dtype)
File ~/.conda/envs/osdf/lib/python3.11/site-packages/distributed/client.py:2420, in _gather()
2418 exception = st.exception
2419 traceback = st.traceback
-> 2420 raise exception.with_traceback(traceback)
2421 if errors == "skip":
2422 bad_keys.add(key)
RuntimeError: Error during deserialization of the task graph. This frequently
occurs if the Scheduler and Client have different environments.
For more information, see
https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments
# Create a customized time series plot
fig = plt.figure(figsize=(10, 4))
ax = fig.add_axes([0, 0, 1, 1])
# plot time series
da_tmp2m.plot(ax=ax, color='steelblue', linewidth=1.5)
# Customize the plot
ax.set_title('Global Mean 2m Temperature - JRA3Q', fontsize=14, fontweight='bold')
ax.set_xlabel('Time', fontsize=12)
ax.set_ylabel('2m Temperature (K)', fontsize=12)
ax.grid(True, alpha=0.3, linestyle='--')
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)cluster.close()