Stream: python-questions

Topic: intake simplecache timeout


view this post on Zulip Matt Long (Sep 22 2021 at 10:25):

I have an intake catalog:

  fgapo_CarbonScope_apo99_v2020:
    driver: netcdf
    description: "Using an atmospheric inversion technique similar to the Jena CO2 inversion, sea-air oxygen exchanges have been estimated from atmospheric O2/N2 measurements (Rödenbeck et al., 2008). In order to eliminate the influence of land processes on atmospheric oxygen abundance, the tracer Atmospheric Potential Oxygen (APO) has been used (Stephens et al., 1998)."
    args:
      urlpath: simplecache::http://www.bgc-jena.mpg.de/CarboScope/apo/INVERSION/OUTPUT/apo99_v2020.nc
      xarray_kwargs:
        decode_times: true
      chunks: {}
      storage_options:
        simplecache:
          same_names: true
          cache_storage: "{{ env(INTAKE_LOCAL_CACHE_DIR) }}"

However, when attempting to load the data, I get a timeout error:

---------------------------------------------------------------------------
FSTimeoutError                            Traceback (most recent call last)
/glade/scratch/mclong/tmp/ipykernel_302817/1914989288.py in <module>
----> 1 ds = cat.fgapo_CarbonScope_apo99_v2020.to_dask().compute()
      2 ds

/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/intake_xarray/base.py in to_dask(self)
     67     def to_dask(self):
     68         """Return xarray object where variables are dask arrays"""
---> 69         return self.read_chunked()
     70
     71     def close(self):

/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/intake_xarray/base.py in read_chunked(self)
     42     def read_chunked(self):
     43         """Return xarray object (which will have chunks)"""
---> 44         self._load_metadata()
     45         return self._ds
     46

/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/intake/source/base.py in _load_metadata(self)
    234         """load metadata only if needed"""
    235         if self._schema is None:
--> 236             self._schema = self._get_schema()
    237             self.dtype = self._schema.dtype
    238             self.shape = self._schema.shape

/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/intake_xarray/base.py in _get_schema(self)
     16
     17         if self._ds is None:
---> 18             self._open_dataset()
     19
     20             metadata = {

/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/intake_xarray/netcdf.py in _open_dataset(self)
     85
     86         if self._can_be_local:
---> 87             url = fsspec.open_local(self.urlpath, **self.storage_options)
     88         else:
     89             # https://github.com/intake/filesystem_spec/issues/476#issuecomment-732372918

/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/fsspec/core.py in open_local(url, mode, **storage_options)
    462             " has attribute local_file=True"
    463         )
--> 464     with of as files:
    465         paths = [f.name for f in files]
    466     if isinstance(url, str) and not has_magic(url):

/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/fsspec/core.py in __enter__(self)
    177             if hasattr(fs, "open_many"):
    178                 # check for concurrent cache download; or set up for upload
--> 179                 self.files = fs.open_many(self)
    180                 return self.files
    181             if hasattr(fs, "fs") and fs.fs is not None:

/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/fsspec/implementations/cached.py in <lambda>(*args, **kw)
    392             # it calls `_open`, but is actually in superclass
    393             return lambda *args, **kw: getattr(type(self), item).__get__(self)(
--> 394                 *args, **kw
    395             )
    396         if item in ["__reduce_ex__"]:

/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/fsspec/implementations/cached.py in open_many(self, open_files)
    500         if downpath:
    501             # skip if all files are already cached and up to date
--> 502             self.fs.get(downpath, downfn)
    503
    504             # update metadata - only happens when downloads are successful

/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/fsspec/asyn.py in wrapper(*args, **kwargs)
     86     def wrapper(*args, **kwargs):
     87         self = obj or args[0]
---> 88         return sync(self.loop, func, *args, **kwargs)
     89
     90     return wrapper

/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/fsspec/asyn.py in sync(loop, func, timeout, *args, **kwargs)
     65     if isinstance(result[0], asyncio.TimeoutError):
     66         # suppress asyncio.TimeoutError, raise FSTimeoutError
---> 67         raise FSTimeoutError
     68     if isinstance(result[0], BaseException):
     69         raise result[0]

FSTimeoutError:

I presume this is because there is some default parameter in fsspec setting a timeout for file transfers. Since this file is ~1.5GB, it's not able to transfer the whole thing in time (though I can see it did transfer some of the data).

Anybody know how to specify a different value for the timeout?

view this post on Zulip Anderson Banihirwe (Sep 22 2021 at 17:04):

@Matt Long,

It's possible to override the default timeout. However, I don't know if this is something one is able to encode in the YAML file because it requires creating a ClientTimeout data structure using aiohttp library...

You can modify the timeout as follows:

In [46]: import aiohttp
In [49]: timeout = aiohttp.ClientTimeout(total=1000) # The maximal number of seconds
In [57]: storage_options={'http': {'client_kwargs': {'timeout': timeout}}, 'simplecache': {'same_names': True, 'cache_storage':  "{{ env(INTAKE_LOCAL_CACHE_DIR) }}"}}

Once you have the storage_options dict, you can invoke the to_dask() method as follows:

ds = cat.fgapo_CarbonScope_apo99_v2020(storage_options=storage_options).to_dask()

view this post on Zulip Matt Long (Sep 22 2021 at 18:18):

Thanks @Anderson Banihirwe!

It seems like you should be able to do something like:

      storage_options:
        http:
          client_kwargs:
            timeout: 3600

but indeed, that doesn't work (AttributeError: 'int' object has no attribute 'total') like you said it wouldn't.


Last updated: Jan 30 2022 at 12:01 UTC