Skip to article frontmatterSkip to article content

Access DART data from NCAR’s data origin and benchmark

# Display output of plots directly in Notebook
import intake
import numpy as np
import pandas as pd
import xarray as xr
import aiohttp
import time
from contextlib import contextmanager
import matplotlib.pyplot as plt
import seaborn as sns
import os
import fsspec.implementations.http as fshttp
from pelicanfs.core import PelicanFileSystem, PelicanMap, OSDFFileSystem 
import dask 
from dask_jobqueue import PBSCluster
from dask.distributed import Client
from dask.distributed import performance_report
init_year0  = '1991'
init_year1  = '2020'
final_year0 = '2071'
final_year1 = '2100'
# # This overwrites the default scheduler with a single-threaded scheduler
# dask.config.set(scheduler='synchronous')  
# File paths
rda_scratch   = '/gpfs/csfs1/collections/rda/scratch/harshah'
rda_url       =  'https://data.rda.ucar.edu/'
database_num  = 'd345001'
cam6_dart_url = rda_url + database_num
#
https_catalog = cam6_dart_url + '/catalogs/https/'+ database_num +'-https-zarr.json'
osdf_catalog  = cam6_dart_url + '/catalogs/osdf/'+ database_num +'-osdf-zarr.json'

Create a Dask cluster

Dask Introduction

Dask is a solution that enables the scaling of Python libraries. It mimics popular scientific libraries such as numpy, pandas, and xarray that enables an easier path to parallel processing without having to refactor code.

There are 3 components to parallel processing with Dask: the client, the scheduler, and the workers.

The Client is best envisioned as the application that sends information to the Dask cluster. In Python applications this is handled when the client is defined with client = Client(CLUSTER_TYPE). A Dask cluster comprises of a single scheduler that manages the execution of tasks on workers. The CLUSTER_TYPE can be defined in a number of different ways.

There is LocalCluster, a cluster running on the same hardware as the application and sharing the available resources, directly in Python with dask.distributed.

In certain JupyterHubs Dask Gateway may be available and a dedicated dask cluster with its own resources can be created dynamically with dask.gateway.

On HPC systems dask_jobqueue is used to connect to the HPC Slurm, PBS or HTCondor job schedulers to provision resources.

The dask.distributed client python module can also be used to connect to existing clusters. A Dask Scheduler and Workers can be deployed in containers, or on Kubernetes, without using a Python function to create a dask cluster. The dask.distributed Client is configured to connect to the scheduler either by container name, or by the Kubernetes service name.

Select the Dask cluster type

The default will be LocalCluster as that can run on any system.

If running on a HPC computer with a PBS Scheduler, set to True. Otherwise, set to False.

USE_PBS_SCHEDULER = False

If running on Jupyter server with Dask Gateway configured, set to True. Otherwise, set to False.

USE_DASK_GATEWAY = False

Python function for a PBS cluster

# Create a PBS cluster object
def get_pbs_cluster():
    """ Create cluster through dask_jobqueue.   
    """
    from dask_jobqueue import PBSCluster
    cluster = PBSCluster(
        job_name = 'dask-osdf-24',
        cores = 1,
        memory = '4GiB',
        processes = 1,
        local_directory = rda_scratch + '/dask/spill',
        log_directory = rda_scratch + '/dask/logs/',
        resource_spec = 'select=1:ncpus=1:mem=4GB',
        queue = 'casper',
        walltime = '3:00:00',
        #interface = 'ib0'
        interface = 'ext'
    )
    return cluster

Python function for a Gateway Cluster

def get_gateway_cluster():
    """ Create cluster through dask_gateway
    """
    from dask_gateway import Gateway

    gateway = Gateway()
    cluster = gateway.new_cluster()
    cluster.adapt(minimum=2, maximum=4)
    return cluster

Python function for a Local Cluster

def get_local_cluster():
    """ Create cluster using the Jupyter server's resources
    """
    from distributed import LocalCluster, performance_report
    cluster = LocalCluster()    

    cluster.scale(4)
    return cluster

Python logic to select the Dask Cluster type

This uses True/False boolean logic based on the variables set in the previous cells

# Obtain dask cluster in one of three ways

if USE_PBS_SCHEDULER:
    cluster = get_pbs_cluster()
elif USE_DASK_GATEWAY:
    cluster = get_gateway_cluster()
else:
    cluster = get_local_cluster()

# Connect to cluster
from distributed import Client
client = Client(cluster)
/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 40229 instead
  warnings.warn(
# Scale the cluster and display cluster dashboard URL
# cluster.scale(4)
cluster
Loading...

Access the data from NCAR’s Research Data Archive using intake

df_https_test = intake.open_esm_datastore(https_catalog)
df_https_test.df['path'].values
array(['https://data.rda.ucar.edu/d345001/hourly6/HR.zarr', 'https://data.rda.ucar.edu/d345001/hourly6/TSA.zarr', 'https://data.rda.ucar.edu/d345001/hourly6/EFLX_LH_TOT.zarr', 'https://data.rda.ucar.edu/d345001/hourly6/ER.zarr', 'https://data.rda.ucar.edu/d345001/weekly/VS.zarr', 'https://data.rda.ucar.edu/d345001/weekly/PS.zarr', 'https://data.rda.ucar.edu/d345001/weekly/Q.zarr', 'https://data.rda.ucar.edu/d345001/weekly/US.zarr', 'https://data.rda.ucar.edu/d345001/weekly/CLDICE.zarr', 'https://data.rda.ucar.edu/d345001/weekly/T.zarr', 'https://data.rda.ucar.edu/d345001/weekly/CLDLIQ.zarr'], dtype=object)
df_osdf_test = intake.open_esm_datastore(osdf_catalog)
df_osdf_test.df['path'].values
array(['osdf:///ncar/rda/d345001/hourly6/HR.zarr', 'osdf:///ncar/rda/d345001/hourly6/TSA.zarr', 'osdf:///ncar/rda/d345001/hourly6/EFLX_LH_TOT.zarr', 'osdf:///ncar/rda/d345001/hourly6/ER.zarr', 'osdf:///ncar/rda/d345001/weekly/VS.zarr', 'osdf:///ncar/rda/d345001/weekly/PS.zarr', 'osdf:///ncar/rda/d345001/weekly/Q.zarr', 'osdf:///ncar/rda/d345001/weekly/US.zarr', 'osdf:///ncar/rda/d345001/weekly/CLDICE.zarr', 'osdf:///ncar/rda/d345001/weekly/T.zarr', 'osdf:///ncar/rda/d345001/weekly/CLDLIQ.zarr'], dtype=object)
data_var = 'PS'
col_subset_https = df_https_test.search(variable=data_var)
col_subset_osdf  = df_osdf_test.search(variable=data_var)
dsets_https = col_subset_https.to_dataset_dict(zarr_kwargs={"consolidated": True})
#
print(f"\nDataset dictionary keys:\n {dsets_https.keys()}")
# Load the first dataset and display a summary.
dataset_key = list(dsets_https.keys())[0]
#
ds_https = dsets_https[dataset_key]
Loading...
dsets_osdf  = col_subset_osdf.to_dataset_dict()
ds_osdf     = dsets_osdf[dataset_key]
Loading...
ds_osdf  = ds_osdf.PS
ds_https = ds_https.PS
ds_osdf
Loading...

Data Access Speed tests

  • We will now test how long it takes to access data (via OSDF and https-only prrotocols) for various sizes using the above array

Prepare data subsets

# Define file path for CSV
csv_file_path = "ncar_benchmark_results.csv"
ds_osdf_1Kb  = ds_osdf.isel(lat=0,lon=0,member_id=0).isel(time=np.arange(130))
ds_https_1Kb = ds_https.isel(lat=0,lon=0,member_id=0).isel(time=np.arange(130))
ds_https_1Kb
Loading...
ds_osdf_1Mb  = ds_osdf.isel(time=0).isel(member_id =1+ np.arange(3))
ds_https_1Mb = ds_https.isel(time=0).isel(member_id =1+ np.arange(3))
ds_osdf_1Mb
Loading...
ds_osdf_10Mb  = ds_osdf.isel(member_id =4).isel(time=np.arange(24))
ds_https_10Mb = ds_https.isel(member_id =4).isel(time=np.arange(24))
ds_osdf_10Mb
Loading...
ds_osdf_100Mb  = ds_osdf.isel(member_id =5).isel(time=np.arange(238))
ds_https_100Mb = ds_https.isel(member_id =5).isel(time=np.arange(238))
#ds_osdf_100Mb
ds_osdf_200Mb  = ds_osdf.isel(member_id = 6)
ds_https_200Mb = ds_https.isel(member_id =6)
#ds_https_200Mb 
ds_osdf_400Mb  = ds_osdf.isel(member_id = 7 +np.arange(2))
ds_https_400Mb = ds_https.isel(member_id =7 + np.arange(2))
ds_osdf_600Mb  = ds_osdf.isel(member_id  = 10 +np.arange(3))
ds_https_600Mb = ds_https.isel(member_id = 10 + np.arange(3))
ds_osdf_800Mb  = ds_osdf.isel(member_id  = 14 +np.arange(4))
ds_https_800Mb = ds_https.isel(member_id = 14 + np.arange(4))
ds_osdf_1Gb  = ds_osdf.isel(member_id  = 19 + np.arange(6)).isel(time = np.arange(410))
ds_https_1Gb = ds_https.isel(member_id = 19 + np.arange(6)).isel(time = np.arange(410))
ds_osdf_1Gb
Loading...
# ds_osdf_10Gb  = ds_osdf.isel(member_id  = 12 + np.arange(52))
# ds_https_10Gb = ds_https.isel(member_id = 12 + np.arange(52))
# ds_osdf_10Gb

Now access data and plot

ds_osdf_list  = [ds_osdf_1Mb,ds_osdf_10Mb,ds_osdf_100Mb,ds_osdf_200Mb,ds_osdf_400Mb,
                 ds_osdf_600Mb,ds_osdf_800Mb,ds_osdf_1Gb]
ds_https_list = [ds_https_1Mb,ds_https_10Mb,ds_https_100Mb,ds_https_200Mb,ds_https_400Mb,
                 ds_https_600Mb,ds_https_800Mb,ds_https_1Gb]
# Number of data access calls
num_calls = 7  # Modify this as needed
n_workers = 4  # Set this to your preferred number of workers
# DiagnosticTimer class to keep track of runtimes
class DiagnosticTimer:
    def __init__(self):
        self.diagnostics = []

    @contextmanager
    def time(self, **kwargs):
        tic = time.time()
        yield
        toc = time.time()
        kwargs["runtime"] = toc - tic
        self.diagnostics.append(kwargs)

    def dataframe(self):
        return pd.DataFrame(self.diagnostics)

# Initialize the DiagnosticTimer
diag_timer = DiagnosticTimer()
# Function to check existing CSV file and determine missing runs
def load_existing_results():
    if os.path.exists(csv_file_path):
        # Load existing CSV into DataFrame
        existing_df = pd.read_csv(csv_file_path)
    else:
        # Create an empty DataFrame if the file does not exist
        existing_df = pd.DataFrame(columns=["dataset_size", "protocol", "call_number", "runtime", "MBps"])
    return existing_df

def filter_missing_runs(datasets, protocol_name, existing_df):
    # Convert dataset sizes to MB for checking, using a list of tuples
    dataset_sizes_mb = [(dataset, dataset.nbytes / (1024 ** 2)) for dataset in datasets]

    # Identify missing dataset sizes and calls
    filtered_datasets = []
    for dataset, dataset_size_mb in dataset_sizes_mb:
        for call_num in range(1, num_calls + 1):
            # Check if this dataset size and call number combination already exists
            if not ((existing_df["dataset_size"] == dataset_size_mb) &
                    (existing_df["protocol"] == protocol_name) &
                    (existing_df["call_number"] == call_num)).any():
                filtered_datasets.append((dataset, dataset_size_mb, call_num))
    
    return filtered_datasets
def benchmark_protocol(datasets, protocol_name, cluster=None):
    existing_df = load_existing_results()  # Load existing results as a checkpoint

    # Filter for missing runs based on existing results
    missing_runs = filter_missing_runs(datasets, protocol_name, existing_df)
    diag_timer = DiagnosticTimer()  # Initialize the diagnostic timer

    # Process each dataset and call
    for (dataset, dataset_size_mb, call_num) in missing_runs:
        # Restart the Dask cluster if provided
        if cluster is not None:
            cluster.scale(0)  # Scale down to release worker memory
            cluster.scale(n_workers)  # Scale up to required number of workers
            client.wait_for_workers(n_workers)  # Wait for workers to be ready

        # Inform the start of processing for this dataset and call
        print(f"Starting processing of dataset for protocol '{protocol_name}' (Size: {dataset_size_mb} MB) in call {call_num}")

        # Only count the time for loading dataset into memory
        dataset_copy = dataset.copy()
        with diag_timer.time(dataset_size=dataset_size_mb, protocol=protocol_name, call_number=call_num):
            dataset_copy.load()  # Load the dataset into memory

        # Convert the single call result to a DataFrame and add MBps column
        call_result_df = diag_timer.dataframe().iloc[[-1]].copy()  # Get the latest diagnostic entry
        call_result_df["MBps"] = call_result_df["dataset_size"] / call_result_df["runtime"]

        # Append this call's result to CSV
        call_result_df.to_csv(csv_file_path, mode='a', header=not os.path.exists(csv_file_path), index=False)
        print(f"Appended results for protocol '{protocol_name}', call {call_num} to '{csv_file_path}'")

        # Print statement after finishing each call
        print(f"Finished processing dataset for protocol '{protocol_name}' in call {call_num}")
# def benchmark_protocol(datasets, protocol_name, cluster=None):
#     for index, dataset in enumerate(datasets):
#         # Calculate dataset size in MB for logging
#         dataset_size_mb = dataset.nbytes / (1024 ** 2)
        
#         # Each dataset will be loaded multiple times to capture caching effect
#         for call_num in range(num_calls):
#             if cluster is not None:
#                 # Scale down to zero workers to clear memory
#                 cluster.scale(0)  # Stop all workers
#                 cluster.scale(n_workers)  # Scale up to the required number of workers
#                 client.wait_for_workers(n_workers)  # Wait for the workers to be ready

#             # Only count the time for loading dataset into memory, excluding cluster scaling time
#             dataset_copy = dataset.copy()
#             with diag_timer.time(dataset_size=dataset_size_mb, protocol=protocol_name, call_number=call_num + 1):
#                 dataset_copy.load()  # Load the dataset into memory
#             print(f" Finished processing dataset {index + 1} in {call_num + 1} th call") 
# Run benchmark for each protocol
benchmark_protocol(ds_https_list, "HTTPS-only",cluster=None)
benchmark_protocol(ds_osdf_list, "OSDF-director",cluster=None)

# Convert diagnostics to a DataFrame for analysis
df_diagnostics = diag_timer.dataframe()

# Calculate MB/s for each run
df_diagnostics['MBps'] = df_diagnostics['dataset_size'] / df_diagnostics['runtime']
df_diagnostics
Starting processing of dataset for protocol 'HTTPS-only' (Size: 198.703125 MB) in call 4
Appended results for protocol 'HTTPS-only', call 4 to 'ncar_benchmark_results.csv'
Finished processing dataset for protocol 'HTTPS-only' in call 4
Starting processing of dataset for protocol 'HTTPS-only' (Size: 198.703125 MB) in call 5
2024-11-14 13:32:46,347 - distributed.worker - WARNING - Compute Failed
Key:       ('open_dataset-PS-getitem-711552ab7e23c8bfdb0adbf5fe7f6b35', 2, 3, 3)
State:     executing
Function:  getter
args:      (ImplicitToExplicitIndexingAdapter(array=CopyOnWriteArray(array=LazilyIndexedArray(array=<xarray.backends.zarr.ZarrArrayWrapper object at 0x148ed6e43cc0>, key=BasicIndexer((slice(None, None, None), slice(None, None, None), slice(None, None, None), slice(None, None, None)))))), (6, slice(160, 240, None), slice(96, 128, None), slice(96, 128, None)))
kwargs:    {}
Exception: 'ClientPayloadError("Response payload is not completed: <TransferEncodingError: 400, message=\'Not enough data for satisfy transfer length header.\'>")'
Traceback: '  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/dask/array/core.py", line 118, in getter\n    c = np.asarray(c)\n        ^^^^^^^^^^^^^\n  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/indexing.py", line 573, in __array__\n    return np.asarray(self.get_duck_array(), dtype=dtype)\n                      ^^^^^^^^^^^^^^^^^^^^^\n  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/indexing.py", line 576, in get_duck_array\n    return self.array.get_duck_array()\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/indexing.py", line 787, in get_duck_array\n    return self.array.get_duck_array()\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/indexing.py", line 650, in get_duck_array\n    array = self.array[self.key]\n            ~~~~~~~~~~^^^^^^^^^^\n  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/backends/zarr.py", line 104, in __getitem__\n    return indexing.explicit_indexing_adapter(\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/indexing.py", line 1014, in explicit_indexing_adapter\n    result = raw_indexing_method(raw_key.tuple)\n             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/backends/zarr.py", line 94, in _getitem\n    return self._array[key]\n           ~~~~~~~~~~~^^^^^\n  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/zarr/core.py", line 798, in __getitem__\n    result = self.get_orthogonal_selection(pure_selection, fields=fields)\n             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/zarr/core.py", line 1080, in get_orthogonal_selection\n    return self._get_selection(indexer=indexer, out=out, fields=fields)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/zarr/core.py", line 1343, in _get_selection\n    self._chunk_getitems(\n  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/zarr/core.py", line 2179, in _chunk_getitems\n    cdatas = self.chunk_store.getitems(ckeys, contexts=contexts)\n             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/zarr/storage.py", line 1435, in getitems\n    raise v\n  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/fsspec/asyn.py", line 245, in _run_coro\n    return await asyncio.wait_for(coro, timeout=timeout), i\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/asyncio/tasks.py", line 520, in wait_for\n    return await fut\n           ^^^^^^^^^\n  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/fsspec/implementations/http.py", line 235, in _cat_file\n    out = await r.read()\n          ^^^^^^^^^^^^^^\n  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/aiohttp/client_reqrep.py", line 1111, in read\n    self._body = await self.content.read()\n                 ^^^^^^^^^^^^^^^^^^^^^^^^^\n  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/aiohttp/streams.py", line 383, in read\n    block = await self.readany()\n            ^^^^^^^^^^^^^^^^^^^^\n  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/aiohttp/streams.py", line 405, in readany\n    await self._wait("readany")\n  File "/glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/aiohttp/streams.py", line 312, in _wait\n    await waiter\n'

---------------------------------------------------------------------------
TransferEncodingError                     Traceback (most recent call last)
File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/aiohttp/client_proto.py:94, in connection_lost()
     93 try:
---> 94     uncompleted = self._parser.feed_eof()
     95 except Exception as underlying_exc:

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/aiohttp/_http_parser.pyx:507, in aiohttp._http_parser.HttpParser.feed_eof()
    506 if self._cparser.flags & cparser.F_CHUNKED:
--> 507     raise TransferEncodingError(
    508         "Not enough data for satisfy transfer length header.")

TransferEncodingError: 400, message:
  Not enough data for satisfy transfer length header.

The above exception was the direct cause of the following exception:

ClientPayloadError                        Traceback (most recent call last)
Cell In[37], line 2
      1 # Run benchmark for each protocol
----> 2 benchmark_protocol(ds_https_list, "HTTPS-only",cluster=None)
      3 benchmark_protocol(ds_osdf_list, "OSDF-director",cluster=None)
      5 # Convert diagnostics to a DataFrame for analysis

Cell In[35], line 22, in benchmark_protocol(datasets, protocol_name, cluster)
     20 dataset_copy = dataset.copy()
     21 with diag_timer.time(dataset_size=dataset_size_mb, protocol=protocol_name, call_number=call_num):
---> 22     dataset_copy.load()  # Load the dataset into memory
     24 # Convert the single call result to a DataFrame and add MBps column
     25 call_result_df = diag_timer.dataframe().iloc[[-1]].copy()  # Get the latest diagnostic entry

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/dataarray.py:1147, in DataArray.load(self, **kwargs)
   1127 def load(self, **kwargs) -> Self:
   1128     """Manually trigger loading of this array's data from disk or a
   1129     remote source into memory and return this array.
   1130 
   (...)
   1145     dask.compute
   1146     """
-> 1147     ds = self._to_temp_dataset().load(**kwargs)
   1148     new = self._from_temp_dataset(ds)
   1149     self._variable = new._variable

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/dataset.py:863, in Dataset.load(self, **kwargs)
    860 chunkmanager = get_chunked_array_type(*lazy_data.values())
    862 # evaluate all the chunked arrays simultaneously
--> 863 evaluated_data: tuple[np.ndarray[Any, Any], ...] = chunkmanager.compute(
    864     *lazy_data.values(), **kwargs
    865 )
    867 for k, data in zip(lazy_data, evaluated_data):
    868     self.variables[k].data = data

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/namedarray/daskmanager.py:86, in DaskManager.compute(self, *data, **kwargs)
     81 def compute(
     82     self, *data: Any, **kwargs: Any
     83 ) -> tuple[np.ndarray[Any, _DType_co], ...]:
     84     from dask.array import compute
---> 86     return compute(*data, **kwargs)

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/dask/base.py:662, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    659     postcomputes.append(x.__dask_postcompute__())
    661 with shorten_traceback():
--> 662     results = schedule(dsk, keys, **kwargs)
    664 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/indexing.py:573, in __array__()
    572 def __array__(self, dtype: np.typing.DTypeLike = None) -> np.ndarray:
--> 573     return np.asarray(self.get_duck_array(), dtype=dtype)

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/indexing.py:576, in get_duck_array()
    575 def get_duck_array(self):
--> 576     return self.array.get_duck_array()

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/indexing.py:787, in get_duck_array()
    786 def get_duck_array(self):
--> 787     return self.array.get_duck_array()

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/indexing.py:650, in get_duck_array()
    646     array = apply_indexer(self.array, self.key)
    647 else:
    648     # If the array is not an ExplicitlyIndexedNDArrayMixin,
    649     # it may wrap a BackendArray so use its __getitem__
--> 650     array = self.array[self.key]
    652 # self.array[self.key] is now a numpy array when
    653 # self.array is a BackendArray subclass
    654 # and self.key is BasicIndexer((slice(None, None, None),))
    655 # so we need the explicit check for ExplicitlyIndexed
    656 if isinstance(array, ExplicitlyIndexed):

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/backends/zarr.py:104, in __getitem__()
    102 elif isinstance(key, indexing.OuterIndexer):
    103     method = self._oindex
--> 104 return indexing.explicit_indexing_adapter(
    105     key, array.shape, indexing.IndexingSupport.VECTORIZED, method
    106 )

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/core/indexing.py:1014, in explicit_indexing_adapter()
    992 """Support explicit indexing by delegating to a raw indexing method.
    993 
    994 Outer and/or vectorized indexers are supported by indexing a second time
   (...)
   1011 Indexing result, in the form of a duck numpy-array.
   1012 """
   1013 raw_key, numpy_indices = decompose_indexer(key, shape, indexing_support)
-> 1014 result = raw_indexing_method(raw_key.tuple)
   1015 if numpy_indices.tuple:
   1016     # index the loaded np.ndarray
   1017     indexable = NumpyIndexingAdapter(result)

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/xarray/backends/zarr.py:94, in _getitem()
     93 def _getitem(self, key):
---> 94     return self._array[key]

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/zarr/core.py:798, in __getitem__()
    796     result = self.vindex[selection]
    797 elif is_pure_orthogonal_indexing(pure_selection, self.ndim):
--> 798     result = self.get_orthogonal_selection(pure_selection, fields=fields)
    799 else:
    800     result = self.get_basic_selection(pure_selection, fields=fields)

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/zarr/core.py:1080, in get_orthogonal_selection()
   1077 # setup indexer
   1078 indexer = OrthogonalIndexer(selection, self)
-> 1080 return self._get_selection(indexer=indexer, out=out, fields=fields)

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/zarr/core.py:1343, in _get_selection()
   1340 if math.prod(out_shape) > 0:
   1341     # allow storage to get multiple items at once
   1342     lchunk_coords, lchunk_selection, lout_selection = zip(*indexer)
-> 1343     self._chunk_getitems(
   1344         lchunk_coords,
   1345         lchunk_selection,
   1346         out,
   1347         lout_selection,
   1348         drop_axes=indexer.drop_axes,
   1349         fields=fields,
   1350     )
   1351 if out.shape:
   1352     return out

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/zarr/core.py:2179, in _chunk_getitems()
   2177     if not isinstance(self._meta_array, np.ndarray):
   2178         contexts = ConstantMap(ckeys, constant=Context(meta_array=self._meta_array))
-> 2179     cdatas = self.chunk_store.getitems(ckeys, contexts=contexts)
   2181 for ckey, chunk_select, out_select in zip(ckeys, lchunk_selection, lout_selection):
   2182     if ckey in cdatas:

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/zarr/storage.py:1435, in getitems()
   1432     continue
   1433 elif isinstance(v, Exception):
   1434     # Raise any other exception
-> 1435     raise v
   1436 else:
   1437     # The function calling this method may not recognize the transformed
   1438     # keys, so we send the values returned by self.map.getitems back into
   1439     # the original key space.
   1440     results[keys_transformed[k]] = v

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/fsspec/asyn.py:245, in _run_coro()
    243 async def _run_coro(coro, i):
    244     try:
--> 245         return await asyncio.wait_for(coro, timeout=timeout), i
    246     except Exception as e:
    247         if not return_exceptions:

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/asyncio/tasks.py:520, in wait_for()
    517         raise TimeoutError from exc
    519 async with timeouts.timeout(timeout):
--> 520     return await fut

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/fsspec/implementations/http.py:235, in _cat_file()
    233 session = await self.set_session()
    234 async with session.get(self.encode_url(url), **kw) as r:
--> 235     out = await r.read()
    236     self._raise_not_found_for_status(r, url)
    237 return out

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/aiohttp/client_reqrep.py:1111, in read()
   1109 if self._body is None:
   1110     try:
-> 1111         self._body = await self.content.read()
   1112         for trace in self._traces:
   1113             await trace.send_response_chunk_received(
   1114                 self.method, self.url, self._body
   1115             )

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/aiohttp/streams.py:383, in read()
    381 blocks = []
    382 while True:
--> 383     block = await self.readany()
    384     if not block:
    385         break

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/aiohttp/streams.py:405, in readany()
    401 # TODO: should be `if` instead of `while`
    402 # because waiter maybe triggered on chunk end,
    403 # without feeding any data
    404 while not self._buffer and not self._eof:
--> 405     await self._wait("readany")
    407 return self._read_nowait(-1)

File /glade/work/harshah/conda-envs/arco_experiments/lib/python3.12/site-packages/aiohttp/streams.py:312, in _wait()
    310 try:
    311     with self._timer:
--> 312         await waiter
    313 finally:
    314     self._waiter = None

ClientPayloadError: Response payload is not completed: <TransferEncodingError: 400, message='Not enough data for satisfy transfer length header.'>
# Plotting MBps vs data size for each protocol and call type
# Define different alpha values for each protocol
alpha_values = {"HTTPS-only": 0.8, "OSDF-director": 0.5}  # Adjust transparency as needed
marker_style = {"HTTPS-only": "o", "OSDF-director": "x"}  # Define different markers for each protocol
#
fig, ax = plt.subplots(figsize=(10, 6))
for protocol in ["HTTPS-only", "OSDF-director"]:
    # First access (call_number == 1)
    first_access = df_diagnostics[(df_diagnostics['protocol'] == protocol) & (df_diagnostics['call_number'] == 1)]
    ax.scatter(first_access['dataset_size'], first_access['MBps'], label=f"{protocol} - First Access",
            alpha=alpha_values[protocol],marker=marker_style[protocol],markersize=8)

    # Subsequent access (call_number > 1)
    subsequent_access = df_diagnostics[(df_diagnostics['protocol'] == protocol) & (df_diagnostics['call_number'] > 1)]
    subsequent_access_avg = subsequent_access.groupby('dataset_size')['MBps'].mean()
    ax.plot(subsequent_access_avg.index, subsequent_access_avg.values, 
            linestyle='--', label=f"{protocol} - Subsequent Access (Avg)",alpha=alpha_values[protocol],marker=marker_style[protocol],markersize=8)
    
# Customize plot appearance
ax.set_xlabel("Data Size (MB)")
ax.set_ylabel("Data Access Speed (MBps)")
ax.set_title("NCAR origin benchmark")
ax.legend()
plt.show()
# Convert dataset size to categorical to control the order in the plot
df_diagnostics['dataset_size'] = df_diagnostics['dataset_size'].astype("category")

# Set the order for dataset sizes to appear in ascending order
size_order = sorted(df_diagnostics['dataset_size'].unique())

# Create the box plot
plt.figure(figsize=(12, 6))
sns.boxplot(
    data=df_diagnostics, 
    x="dataset_size", 
    y="MBps", 
    hue="protocol", 
    order=size_order
)

# Customize plot appearance
plt.xlabel("Data Size (MB)")
plt.ylabel("Data Access Speed (MBps)")
plt.title("NCAR to UWMadison, Dask: 4x4GiB, 7 requests")
plt.legend(title="Protocol")
plt.show()

Try with a specific cache

# historical_smbb_test1 = historical_smbb.isel(time=0).isel(member_id =1+ np.arange(5))
# historical_smbb_test1
# %%timeit -r2 -n3 -o
# historical_smbb_test1.compute()
# #Try using a specific cache
# sdsc_cache='https://sdsc-cache.nationalresearchplatform.org:8443/aws-opendata/us-west-2/ncar-cesm2-lens/atm/monthly/'+\
#             'cesm2LE-historical-smbb-TREFHTMX.zarr'
# %%time
# test_1 = xr.open_zarr(sdsc_cache).TREFHTMX.isel(time=0)
# test_1