# Display output of plots directly in Notebook
import intake
import numpy as np
import pandas as pd
import xarray as xr
import aiohttp
import time
from contextlib import contextmanager
import matplotlib.pyplot as plt
import seaborn as sns
import osimport fsspec.implementations.http as fshttp
from pelicanfs.core import PelicanFileSystem, PelicanMap, OSDFFileSystem import dask
from dask_jobqueue import PBSCluster
from dask.distributed import Client
from dask.distributed import performance_reportinit_year0 = '1991'
init_year1 = '2020'
final_year0 = '2071'
final_year1 = '2100'# # This overwrites the default scheduler with a single-threaded scheduler
# dask.config.set(scheduler='synchronous') # File paths
rda_scratch = '/gpfs/csfs1/collections/rda/scratch/harshah'
rda_url = 'https://data.rda.ucar.edu/'
database_num = 'd345001'
cam6_dart_url = rda_url + database_num
#
https_catalog = cam6_dart_url + '/catalogs/https/'+ database_num +'-https-zarr.json'
osdf_catalog = cam6_dart_url + '/catalogs/osdf/'+ database_num +'-osdf-zarr.json'Create a Dask cluster¶
Dask Introduction¶
Dask is a solution that enables the scaling of Python libraries. It mimics popular scientific libraries such as numpy, pandas, and xarray that enables an easier path to parallel processing without having to refactor code.
There are 3 components to parallel processing with Dask: the client, the scheduler, and the workers.
The Client is best envisioned as the application that sends information to the Dask cluster. In Python applications this is handled when the client is defined with client = Client(CLUSTER_TYPE). A Dask cluster comprises of a single scheduler that manages the execution of tasks on workers. The CLUSTER_TYPE can be defined in a number of different ways.
There is LocalCluster, a cluster running on the same hardware as the application and sharing the available resources, directly in Python with dask.distributed.
In certain JupyterHubs Dask Gateway may be available and a dedicated dask cluster with its own resources can be created dynamically with dask.gateway.
On HPC systems dask_jobqueue is used to connect to the HPC Slurm, PBS or HTCondor job schedulers to provision resources.
The dask.distributed client python module can also be used to connect to existing clusters. A Dask Scheduler and Workers can be deployed in containers, or on Kubernetes, without using a Python function to create a dask cluster. The dask.distributed Client is configured to connect to the scheduler either by container name, or by the Kubernetes service name.
Select the Dask cluster type¶
The default will be LocalCluster as that can run on any system.
If running on a HPC computer with a PBS Scheduler, set to True. Otherwise, set to False.
USE_PBS_SCHEDULER = FalseIf running on Jupyter server with Dask Gateway configured, set to True. Otherwise, set to False.
USE_DASK_GATEWAY = FalsePython function for a PBS cluster¶
# Create a PBS cluster object
def get_pbs_cluster():
""" Create cluster through dask_jobqueue.
"""
from dask_jobqueue import PBSCluster
cluster = PBSCluster(
job_name = 'dask-osdf-24',
cores = 1,
memory = '4GiB',
processes = 1,
local_directory = rda_scratch + '/dask/spill',
log_directory = rda_scratch + '/dask/logs/',
resource_spec = 'select=1:ncpus=1:mem=4GB',
queue = 'casper',
walltime = '3:00:00',
#interface = 'ib0'
interface = 'ext'
)
return clusterPython function for a Gateway Cluster¶
def get_gateway_cluster():
""" Create cluster through dask_gateway
"""
from dask_gateway import Gateway
gateway = Gateway()
cluster = gateway.new_cluster()
cluster.adapt(minimum=2, maximum=4)
return clusterPython function for a Local Cluster¶
def get_local_cluster():
""" Create cluster using the Jupyter server's resources
"""
from distributed import LocalCluster, performance_report
cluster = LocalCluster()
cluster.scale(4)
return clusterPython logic to select the Dask Cluster type¶
This uses True/False boolean logic based on the variables set in the previous cells
# Obtain dask cluster in one of three ways
if USE_PBS_SCHEDULER:
cluster = get_pbs_cluster()
elif USE_DASK_GATEWAY:
cluster = get_gateway_cluster()
else:
cluster = get_local_cluster()
# Connect to cluster
from distributed import Client
client = Client(cluster)/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 40229 instead
warnings.warn(
# Scale the cluster and display cluster dashboard URL
# cluster.scale(4)
clusterAccess the data from NCAR’s Research Data Archive using intake¶
df_https_test = intake.open_esm_datastore(https_catalog)
df_https_test.df['path'].valuesarray(['https://data.rda.ucar.edu/d345001/hourly6/HR.zarr',
'https://data.rda.ucar.edu/d345001/hourly6/TSA.zarr',
'https://data.rda.ucar.edu/d345001/hourly6/EFLX_LH_TOT.zarr',
'https://data.rda.ucar.edu/d345001/hourly6/ER.zarr',
'https://data.rda.ucar.edu/d345001/weekly/VS.zarr',
'https://data.rda.ucar.edu/d345001/weekly/PS.zarr',
'https://data.rda.ucar.edu/d345001/weekly/Q.zarr',
'https://data.rda.ucar.edu/d345001/weekly/US.zarr',
'https://data.rda.ucar.edu/d345001/weekly/CLDICE.zarr',
'https://data.rda.ucar.edu/d345001/weekly/T.zarr',
'https://data.rda.ucar.edu/d345001/weekly/CLDLIQ.zarr'],
dtype=object)df_osdf_test = intake.open_esm_datastore(osdf_catalog)
df_osdf_test.df['path'].valuesarray(['osdf:///ncar/rda/d345001/hourly6/HR.zarr',
'osdf:///ncar/rda/d345001/hourly6/TSA.zarr',
'osdf:///ncar/rda/d345001/hourly6/EFLX_LH_TOT.zarr',
'osdf:///ncar/rda/d345001/hourly6/ER.zarr',
'osdf:///ncar/rda/d345001/weekly/VS.zarr',
'osdf:///ncar/rda/d345001/weekly/PS.zarr',
'osdf:///ncar/rda/d345001/weekly/Q.zarr',
'osdf:///ncar/rda/d345001/weekly/US.zarr',
'osdf:///ncar/rda/d345001/weekly/CLDICE.zarr',
'osdf:///ncar/rda/d345001/weekly/T.zarr',
'osdf:///ncar/rda/d345001/weekly/CLDLIQ.zarr'], dtype=object)data_var = 'PS'
col_subset_https = df_https_test.search(variable=data_var)
col_subset_osdf = df_osdf_test.search(variable=data_var)dsets_https = col_subset_https.to_dataset_dict(zarr_kwargs={"consolidated": True})
#
print(f"\nDataset dictionary keys:\n {dsets_https.keys()}")
# Load the first dataset and display a summary.
dataset_key = list(dsets_https.keys())[0]
#
ds_https = dsets_https[dataset_key]dsets_osdf = col_subset_osdf.to_dataset_dict()
ds_osdf = dsets_osdf[dataset_key]ds_osdf = ds_osdf.PS
ds_https = ds_https.PS
ds_osdfData Access Speed tests¶
- We will now test how long it takes to access data (via OSDF and https-only prrotocols) for various sizes using the above array
Prepare data subsets¶
# Define file path for CSV
csv_file_path = "ncar_benchmark_results.csv"ds_osdf_1Kb = ds_osdf.isel(lat=0,lon=0,member_id=0).isel(time=np.arange(130))
ds_https_1Kb = ds_https.isel(lat=0,lon=0,member_id=0).isel(time=np.arange(130))
ds_https_1Kbds_osdf_1Mb = ds_osdf.isel(time=0).isel(member_id =1+ np.arange(3))
ds_https_1Mb = ds_https.isel(time=0).isel(member_id =1+ np.arange(3))
ds_osdf_1Mbds_osdf_10Mb = ds_osdf.isel(member_id =4).isel(time=np.arange(24))
ds_https_10Mb = ds_https.isel(member_id =4).isel(time=np.arange(24))
ds_osdf_10Mbds_osdf_100Mb = ds_osdf.isel(member_id =5).isel(time=np.arange(238))
ds_https_100Mb = ds_https.isel(member_id =5).isel(time=np.arange(238))
#ds_osdf_100Mbds_osdf_200Mb = ds_osdf.isel(member_id = 6)
ds_https_200Mb = ds_https.isel(member_id =6)
#ds_https_200Mb ds_osdf_400Mb = ds_osdf.isel(member_id = 7 +np.arange(2))
ds_https_400Mb = ds_https.isel(member_id =7 + np.arange(2))ds_osdf_600Mb = ds_osdf.isel(member_id = 10 +np.arange(3))
ds_https_600Mb = ds_https.isel(member_id = 10 + np.arange(3))ds_osdf_800Mb = ds_osdf.isel(member_id = 14 +np.arange(4))
ds_https_800Mb = ds_https.isel(member_id = 14 + np.arange(4))ds_osdf_1Gb = ds_osdf.isel(member_id = 19 + np.arange(6)).isel(time = np.arange(410))
ds_https_1Gb = ds_https.isel(member_id = 19 + np.arange(6)).isel(time = np.arange(410))
ds_osdf_1Gb# ds_osdf_10Gb = ds_osdf.isel(member_id = 12 + np.arange(52))
# ds_https_10Gb = ds_https.isel(member_id = 12 + np.arange(52))
# ds_osdf_10GbNow access data and plot¶
ds_osdf_list = [ds_osdf_1Mb,ds_osdf_10Mb,ds_osdf_100Mb,ds_osdf_200Mb,ds_osdf_400Mb,
ds_osdf_600Mb,ds_osdf_800Mb,ds_osdf_1Gb]
ds_https_list = [ds_https_1Mb,ds_https_10Mb,ds_https_100Mb,ds_https_200Mb,ds_https_400Mb,
ds_https_600Mb,ds_https_800Mb,ds_https_1Gb]# Number of data access calls
num_calls = 7 # Modify this as needed
n_workers = 4 # Set this to your preferred number of workers# DiagnosticTimer class to keep track of runtimes
class DiagnosticTimer:
def __init__(self):
self.diagnostics = []
@contextmanager
def time(self, **kwargs):
tic = time.time()
yield
toc = time.time()
kwargs["runtime"] = toc - tic
self.diagnostics.append(kwargs)
def dataframe(self):
return pd.DataFrame(self.diagnostics)
# Initialize the DiagnosticTimer
diag_timer = DiagnosticTimer()# Function to check existing CSV file and determine missing runs
def load_existing_results():
if os.path.exists(csv_file_path):
# Load existing CSV into DataFrame
existing_df = pd.read_csv(csv_file_path)
else:
# Create an empty DataFrame if the file does not exist
existing_df = pd.DataFrame(columns=["dataset_size", "protocol", "call_number", "runtime", "MBps"])
return existing_df
def filter_missing_runs(datasets, protocol_name, existing_df):
# Convert dataset sizes to MB for checking, using a list of tuples
dataset_sizes_mb = [(dataset, dataset.nbytes / (1024 ** 2)) for dataset in datasets]
# Identify missing dataset sizes and calls
filtered_datasets = []
for dataset, dataset_size_mb in dataset_sizes_mb:
for call_num in range(1, num_calls + 1):
# Check if this dataset size and call number combination already exists
if not ((existing_df["dataset_size"] == dataset_size_mb) &
(existing_df["protocol"] == protocol_name) &
(existing_df["call_number"] == call_num)).any():
filtered_datasets.append((dataset, dataset_size_mb, call_num))
return filtered_datasetsdef benchmark_protocol(datasets, protocol_name, cluster=None):
existing_df = load_existing_results() # Load existing results as a checkpoint
# Filter for missing runs based on existing results
missing_runs = filter_missing_runs(datasets, protocol_name, existing_df)
diag_timer = DiagnosticTimer() # Initialize the diagnostic timer
# Process each dataset and call
for (dataset, dataset_size_mb, call_num) in missing_runs:
# Restart the Dask cluster if provided
if cluster is not None:
cluster.scale(0) # Scale down to release worker memory
cluster.scale(n_workers) # Scale up to required number of workers
client.wait_for_workers(n_workers) # Wait for workers to be ready
# Inform the start of processing for this dataset and call
print(f"Starting processing of dataset for protocol '{protocol_name}' (Size: {dataset_size_mb} MB) in call {call_num}")
# Only count the time for loading dataset into memory
dataset_copy = dataset.copy()
with diag_timer.time(dataset_size=dataset_size_mb, protocol=protocol_name, call_number=call_num):
dataset_copy.load() # Load the dataset into memory
# Convert the single call result to a DataFrame and add MBps column
call_result_df = diag_timer.dataframe().iloc[[-1]].copy() # Get the latest diagnostic entry
call_result_df["MBps"] = call_result_df["dataset_size"] / call_result_df["runtime"]
# Append this call's result to CSV
call_result_df.to_csv(csv_file_path, mode='a', header=not os.path.exists(csv_file_path), index=False)
print(f"Appended results for protocol '{protocol_name}', call {call_num} to '{csv_file_path}'")
# Print statement after finishing each call
print(f"Finished processing dataset for protocol '{protocol_name}' in call {call_num}")
# def benchmark_protocol(datasets, protocol_name, cluster=None):
# for index, dataset in enumerate(datasets):
# # Calculate dataset size in MB for logging
# dataset_size_mb = dataset.nbytes / (1024 ** 2)
# # Each dataset will be loaded multiple times to capture caching effect
# for call_num in range(num_calls):
# if cluster is not None:
# # Scale down to zero workers to clear memory
# cluster.scale(0) # Stop all workers
# cluster.scale(n_workers) # Scale up to the required number of workers
# client.wait_for_workers(n_workers) # Wait for the workers to be ready
# # Only count the time for loading dataset into memory, excluding cluster scaling time
# dataset_copy = dataset.copy()
# with diag_timer.time(dataset_size=dataset_size_mb, protocol=protocol_name, call_number=call_num + 1):
# dataset_copy.load() # Load the dataset into memory
# print(f" Finished processing dataset {index + 1} in {call_num + 1} th call") # Run benchmark for each protocol
benchmark_protocol(ds_https_list, "HTTPS-only",cluster=None)
benchmark_protocol(ds_osdf_list, "OSDF-director",cluster=None)
# Convert diagnostics to a DataFrame for analysis
df_diagnostics = diag_timer.dataframe()
# Calculate MB/s for each run
df_diagnostics['MBps'] = df_diagnostics['dataset_size'] / df_diagnostics['runtime']
df_diagnosticsStarting processing of dataset for protocol 'HTTPS-only' (Size: 198.703125 MB) in call 4
Appended results for protocol 'HTTPS-only', call 4 to 'ncar_benchmark_results.csv'
Finished processing dataset for protocol 'HTTPS-only' in call 4
Starting processing of dataset for protocol 'HTTPS-only' (Size: 198.703125 MB) in call 5
2024-11-14 13:32:46,347 - distributed.worker - WARNING - Compute Failed
Key: ('open_dataset-PS-getitem-711552ab7e23c8bfdb0adbf5fe7f6b35', 2, 3, 3)
State: executing
Function: getter
args: (ImplicitToExplicitIndexingAdapter(array=CopyOnWriteArray(array=LazilyIndexedArray(array=<xarray.backends.zarr.ZarrArrayWrapper object at 0x148ed6e43cc0>, key=BasicIndexer((slice(None, None, None), slice(None, None, None), slice(None, None, None), slice(None, None, None)))))), (6, slice(160, 240, None), slice(96, 128, None), slice(96, 128, None)))
kwargs: {}
Exception: 'ClientPayloadError("Response payload is not completed: <TransferEncodingError: 400, message=\'Not enough data for satisfy transfer length header.\'>")'
Traceback: ' File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/dask/array/core.py", line 118, in getter\n c = np.asarray(c)\n ^^^^^^^^^^^^^\n File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/indexing.py", line 573, in __array__\n return np.asarray(self.get_duck_array(), dtype=dtype)\n ^^^^^^^^^^^^^^^^^^^^^\n File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/indexing.py", line 576, in get_duck_array\n return self.array.get_duck_array()\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/indexing.py", line 787, in get_duck_array\n return self.array.get_duck_array()\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/indexing.py", line 650, in get_duck_array\n array = self.array[self.key]\n ~~~~~~~~~~^^^^^^^^^^\n File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/backends/zarr.py", line 104, in __getitem__\n return indexing.explicit_indexing_adapter(\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/indexing.py", line 1014, in explicit_indexing_adapter\n result = raw_indexing_method(raw_key.tuple)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/backends/zarr.py", line 94, in _getitem\n return self._array[key]\n ~~~~~~~~~~~^^^^^\n File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/zarr/core.py", line 798, in __getitem__\n result = self.get_orthogonal_selection(pure_selection, fields=fields)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/zarr/core.py", line 1080, in get_orthogonal_selection\n return self._get_selection(indexer=indexer, out=out, fields=fields)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/zarr/core.py", line 1343, in _get_selection\n self._chunk_getitems(\n File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/zarr/core.py", line 2179, in _chunk_getitems\n cdatas = self.chunk_store.getitems(ckeys, contexts=contexts)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/zarr/storage.py", line 1435, in getitems\n raise v\n File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/fsspec/asyn.py", line 245, in _run_coro\n return await asyncio.wait_for(coro, timeout=timeout), i\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/asyncio/tasks.py", line 520, in wait_for\n return await fut\n ^^^^^^^^^\n File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/fsspec/implementations/http.py", line 235, in _cat_file\n out = await r.read()\n ^^^^^^^^^^^^^^\n File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/aiohttp/client_reqrep.py", line 1111, in read\n self._body = await self.content.read()\n ^^^^^^^^^^^^^^^^^^^^^^^^^\n File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/aiohttp/streams.py", line 383, in read\n block = await self.readany()\n ^^^^^^^^^^^^^^^^^^^^\n File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/aiohttp/streams.py", line 405, in readany\n await self._wait("readany")\n File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/aiohttp/streams.py", line 312, in _wait\n await waiter\n'
---------------------------------------------------------------------------
TransferEncodingError Traceback (most recent call last)
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/aiohttp/client_proto.py:94, in connection_lost()
93 try:
---> 94 uncompleted = self._parser.feed_eof()
95 except Exception as underlying_exc:
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/aiohttp/_http_parser.pyx:507, in aiohttp._http_parser.HttpParser.feed_eof()
506 if self._cparser.flags & cparser.F_CHUNKED:
--> 507 raise TransferEncodingError(
508 "Not enough data for satisfy transfer length header.")
TransferEncodingError: 400, message:
Not enough data for satisfy transfer length header.
The above exception was the direct cause of the following exception:
ClientPayloadError Traceback (most recent call last)
Cell In[37], line 2
1 # Run benchmark for each protocol
----> 2 benchmark_protocol(ds_https_list, "HTTPS-only",cluster=None)
3 benchmark_protocol(ds_osdf_list, "OSDF-director",cluster=None)
5 # Convert diagnostics to a DataFrame for analysis
Cell In[35], line 22, in benchmark_protocol(datasets, protocol_name, cluster)
20 dataset_copy = dataset.copy()
21 with diag_timer.time(dataset_size=dataset_size_mb, protocol=protocol_name, call_number=call_num):
---> 22 dataset_copy.load() # Load the dataset into memory
24 # Convert the single call result to a DataFrame and add MBps column
25 call_result_df = diag_timer.dataframe().iloc[[-1]].copy() # Get the latest diagnostic entry
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/dataarray.py:1147, in DataArray.load(self, **kwargs)
1127 def load(self, **kwargs) -> Self:
1128 """Manually trigger loading of this array's data from disk or a
1129 remote source into memory and return this array.
1130
(...)
1145 dask.compute
1146 """
-> 1147 ds = self._to_temp_dataset().load(**kwargs)
1148 new = self._from_temp_dataset(ds)
1149 self._variable = new._variable
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/dataset.py:863, in Dataset.load(self, **kwargs)
860 chunkmanager = get_chunked_array_type(*lazy_data.values())
862 # evaluate all the chunked arrays simultaneously
--> 863 evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute(
864 *lazy_data.values(), **kwargs
865 )
867 for k, data in zip(lazy_data, evaluated_data):
868 self.variables[k].data = data
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/namedarray/daskmanager.py:86, in DaskManager.compute(self, *data, **kwargs)
81 def compute(
82 self, *data: Any, **kwargs: Any
83 ) -> tuple[np.ndarray[Any, _DType_co], ...]:
84 from dask.array import compute
---> 86 return compute(*data, **kwargs)
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/dask/base.py:662, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
659 postcomputes.append(x.__dask_postcompute__())
661 with shorten_traceback():
--> 662 results = schedule(dsk, keys, **kwargs)
664 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/indexing.py:573, in __array__()
572 def __array__(self, dtype: np.typing.DTypeLike = None) -> np.ndarray:
--> 573 return np.asarray(self.get_duck_array(), dtype=dtype)
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/indexing.py:576, in get_duck_array()
575 def get_duck_array(self):
--> 576 return self.array.get_duck_array()
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/indexing.py:787, in get_duck_array()
786 def get_duck_array(self):
--> 787 return self.array.get_duck_array()
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/indexing.py:650, in get_duck_array()
646 array = apply_indexer(self.array, self.key)
647 else:
648 # If the array is not an ExplicitlyIndexedNDArrayMixin,
649 # it may wrap a BackendArray so use its __getitem__
--> 650 array = self.array[self.key]
652 # self.array[self.key] is now a numpy array when
653 # self.array is a BackendArray subclass
654 # and self.key is BasicIndexer((slice(None, None, None),))
655 # so we need the explicit check for ExplicitlyIndexed
656 if isinstance(array, ExplicitlyIndexed):
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/backends/zarr.py:104, in __getitem__()
102 elif isinstance(key, indexing.OuterIndexer):
103 method = self._oindex
--> 104 return indexing.explicit_indexing_adapter(
105 key, array.shape, indexing.IndexingSupport.VECTORIZED, method
106 )
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/indexing.py:1014, in explicit_indexing_adapter()
992 """Support explicit indexing by delegating to a raw indexing method.
993
994 Outer and/or vectorized indexers are supported by indexing a second time
(...)
1011 Indexing result, in the form of a duck numpy-array.
1012 """
1013 raw_key, numpy_indices = decompose_indexer(key, shape, indexing_support)
-> 1014 result = raw_indexing_method(raw_key.tuple)
1015 if numpy_indices.tuple:
1016 # index the loaded np.ndarray
1017 indexable = NumpyIndexingAdapter(result)
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/backends/zarr.py:94, in _getitem()
93 def _getitem(self, key):
---> 94 return self._array[key]
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/zarr/core.py:798, in __getitem__()
796 result = self.vindex[selection]
797 elif is_pure_orthogonal_indexing(pure_selection, self.ndim):
--> 798 result = self.get_orthogonal_selection(pure_selection, fields=fields)
799 else:
800 result = self.get_basic_selection(pure_selection, fields=fields)
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/zarr/core.py:1080, in get_orthogonal_selection()
1077 # setup indexer
1078 indexer = OrthogonalIndexer(selection, self)
-> 1080 return self._get_selection(indexer=indexer, out=out, fields=fields)
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/zarr/core.py:1343, in _get_selection()
1340 if math.prod(out_shape) > 0:
1341 # allow storage to get multiple items at once
1342 lchunk_coords, lchunk_selection, lout_selection = zip(*indexer)
-> 1343 self._chunk_getitems(
1344 lchunk_coords,
1345 lchunk_selection,
1346 out,
1347 lout_selection,
1348 drop_axes=indexer.drop_axes,
1349 fields=fields,
1350 )
1351 if out.shape:
1352 return out
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/zarr/core.py:2179, in _chunk_getitems()
2177 if not isinstance(self._meta_array, np.ndarray):
2178 contexts = ConstantMap(ckeys, constant=Context(meta_array=self._meta_array))
-> 2179 cdatas = self.chunk_store.getitems(ckeys, contexts=contexts)
2181 for ckey, chunk_select, out_select in zip(ckeys, lchunk_selection, lout_selection):
2182 if ckey in cdatas:
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/zarr/storage.py:1435, in getitems()
1432 continue
1433 elif isinstance(v, Exception):
1434 # Raise any other exception
-> 1435 raise v
1436 else:
1437 # The function calling this method may not recognize the transformed
1438 # keys, so we send the values returned by self.map.getitems back into
1439 # the original key space.
1440 results[keys_transformed[k]] = v
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/fsspec/asyn.py:245, in _run_coro()
243 async def _run_coro(coro, i):
244 try:
--> 245 return await asyncio.wait_for(coro, timeout=timeout), i
246 except Exception as e:
247 if not return_exceptions:
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/asyncio/tasks.py:520, in wait_for()
517 raise TimeoutError from exc
519 async with timeouts.timeout(timeout):
--> 520 return await fut
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/fsspec/implementations/http.py:235, in _cat_file()
233 session = await self.set_session()
234 async with session.get(self.encode_url(url), **kw) as r:
--> 235 out = await r.read()
236 self._raise_not_found_for_status(r, url)
237 return out
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/aiohttp/client_reqrep.py:1111, in read()
1109 if self._body is None:
1110 try:
-> 1111 self._body = await self.content.read()
1112 for trace in self._traces:
1113 await trace.send_response_chunk_received(
1114 self.method, self.url, self._body
1115 )
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/aiohttp/streams.py:383, in read()
381 blocks = []
382 while True:
--> 383 block = await self.readany()
384 if not block:
385 break
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/aiohttp/streams.py:405, in readany()
401 # TODO: should be `if` instead of `while`
402 # because waiter maybe triggered on chunk end,
403 # without feeding any data
404 while not self._buffer and not self._eof:
--> 405 await self._wait("readany")
407 return self._read_nowait(-1)
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/aiohttp/streams.py:312, in _wait()
310 try:
311 with self._timer:
--> 312 await waiter
313 finally:
314 self._waiter = None
ClientPayloadError: Response payload is not completed: <TransferEncodingError: 400, message='Not enough data for satisfy transfer length header.'># Plotting MBps vs data size for each protocol and call type
# Define different alpha values for each protocol
alpha_values = {"HTTPS-only": 0.8, "OSDF-director": 0.5} # Adjust transparency as needed
marker_style = {"HTTPS-only": "o", "OSDF-director": "x"} # Define different markers for each protocol
#
fig, ax = plt.subplots(figsize=(10, 6))
for protocol in ["HTTPS-only", "OSDF-director"]:
# First access (call_number == 1)
first_access = df_diagnostics[(df_diagnostics['protocol'] == protocol) & (df_diagnostics['call_number'] == 1)]
ax.scatter(first_access['dataset_size'], first_access['MBps'], label=f"{protocol} - First Access",
alpha=alpha_values[protocol],marker=marker_style[protocol],markersize=8)
# Subsequent access (call_number > 1)
subsequent_access = df_diagnostics[(df_diagnostics['protocol'] == protocol) & (df_diagnostics['call_number'] > 1)]
subsequent_access_avg = subsequent_access.groupby('dataset_size')['MBps'].mean()
ax.plot(subsequent_access_avg.index, subsequent_access_avg.values,
linestyle='--', label=f"{protocol} - Subsequent Access (Avg)",alpha=alpha_values[protocol],marker=marker_style[protocol],markersize=8)
# Customize plot appearance
ax.set_xlabel("Data Size (MB)")
ax.set_ylabel("Data Access Speed (MBps)")
ax.set_title("NCAR origin benchmark")
ax.legend()
plt.show()
# Convert dataset size to categorical to control the order in the plot
df_diagnostics['dataset_size'] = df_diagnostics['dataset_size'].astype("category")
# Set the order for dataset sizes to appear in ascending order
size_order = sorted(df_diagnostics['dataset_size'].unique())
# Create the box plot
plt.figure(figsize=(12, 6))
sns.boxplot(
data=df_diagnostics,
x="dataset_size",
y="MBps",
hue="protocol",
order=size_order
)
# Customize plot appearance
plt.xlabel("Data Size (MB)")
plt.ylabel("Data Access Speed (MBps)")
plt.title("NCAR to UWMadison, Dask: 4x4GiB, 7 requests")
plt.legend(title="Protocol")
plt.show()Try with a specific cache¶
# historical_smbb_test1 = historical_smbb.isel(time=0).isel(member_id =1+ np.arange(5))
# historical_smbb_test1# %%timeit -r2 -n3 -o
# historical_smbb_test1.compute()# #Try using a specific cache
# sdsc_cache='https://sdsc-cache.nationalresearchplatform.org:8443/aws-opendata/us-west-2/ncar-cesm2-lens/atm/monthly/'+\
# 'cesm2LE-historical-smbb-TREFHTMX.zarr'# %%time
# test_1 = xr.open_zarr(sdsc_cache).TREFHTMX.isel(time=0)
# test_1