Stream: python-questions
Topic: intake simplecache timeout
Matt Long (Sep 22 2021 at 10:25):
I have an intake catalog:
fgapo_CarbonScope_apo99_v2020:
driver: netcdf
description: "Using an atmospheric inversion technique similar to the Jena CO2 inversion, sea-air oxygen exchanges have been estimated from atmospheric O2/N2 measurements (Rödenbeck et al., 2008). In order to eliminate the influence of land processes on atmospheric oxygen abundance, the tracer Atmospheric Potential Oxygen (APO) has been used (Stephens et al., 1998)."
args:
urlpath: simplecache::http://www.bgc-jena.mpg.de/CarboScope/apo/INVERSION/OUTPUT/apo99_v2020.nc
xarray_kwargs:
decode_times: true
chunks: {}
storage_options:
simplecache:
same_names: true
cache_storage: "{{ env(INTAKE_LOCAL_CACHE_DIR) }}"
However, when attempting to load the data, I get a timeout error:
---------------------------------------------------------------------------
FSTimeoutError Traceback (most recent call last)
/glade/scratch/mclong/tmp/ipykernel_302817/1914989288.py in <module>
----> 1 ds = cat.fgapo_CarbonScope_apo99_v2020.to_dask().compute()
2 ds
/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/intake_xarray/base.py in to_dask(self)
67 def to_dask(self):
68 """Return xarray object where variables are dask arrays"""
---> 69 return self.read_chunked()
70
71 def close(self):
/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/intake_xarray/base.py in read_chunked(self)
42 def read_chunked(self):
43 """Return xarray object (which will have chunks)"""
---> 44 self._load_metadata()
45 return self._ds
46
/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/intake/source/base.py in _load_metadata(self)
234 """load metadata only if needed"""
235 if self._schema is None:
--> 236 self._schema = self._get_schema()
237 self.dtype = self._schema.dtype
238 self.shape = self._schema.shape
/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/intake_xarray/base.py in _get_schema(self)
16
17 if self._ds is None:
---> 18 self._open_dataset()
19
20 metadata = {
/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/intake_xarray/netcdf.py in _open_dataset(self)
85
86 if self._can_be_local:
---> 87 url = fsspec.open_local(self.urlpath, **self.storage_options)
88 else:
89 # https://github.com/intake/filesystem_spec/issues/476#issuecomment-732372918
/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/fsspec/core.py in open_local(url, mode, **storage_options)
462 " has attribute local_file=True"
463 )
--> 464 with of as files:
465 paths = [f.name for f in files]
466 if isinstance(url, str) and not has_magic(url):
/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/fsspec/core.py in __enter__(self)
177 if hasattr(fs, "open_many"):
178 # check for concurrent cache download; or set up for upload
--> 179 self.files = fs.open_many(self)
180 return self.files
181 if hasattr(fs, "fs") and fs.fs is not None:
/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/fsspec/implementations/cached.py in <lambda>(*args, **kw)
392 # it calls `_open`, but is actually in superclass
393 return lambda *args, **kw: getattr(type(self), item).__get__(self)(
--> 394 *args, **kw
395 )
396 if item in ["__reduce_ex__"]:
/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/fsspec/implementations/cached.py in open_many(self, open_files)
500 if downpath:
501 # skip if all files are already cached and up to date
--> 502 self.fs.get(downpath, downfn)
503
504 # update metadata - only happens when downloads are successful
/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/fsspec/asyn.py in wrapper(*args, **kwargs)
86 def wrapper(*args, **kwargs):
87 self = obj or args[0]
---> 88 return sync(self.loop, func, *args, **kwargs)
89
90 return wrapper
/glade/work/mclong/miniconda3/envs/sno/lib/python3.7/site-packages/fsspec/asyn.py in sync(loop, func, timeout, *args, **kwargs)
65 if isinstance(result[0], asyncio.TimeoutError):
66 # suppress asyncio.TimeoutError, raise FSTimeoutError
---> 67 raise FSTimeoutError
68 if isinstance(result[0], BaseException):
69 raise result[0]
FSTimeoutError:
I presume this is because there is some default parameter in fsspec
setting a timeout for file transfers. Since this file is ~1.5GB, it's not able to transfer the whole thing in time (though I can see it did transfer some of the data).
Anybody know how to specify a different value for the timeout?
Anderson Banihirwe (Sep 22 2021 at 17:04):
@Matt Long,
It's possible to override the default timeout. However, I don't know if this is something one is able to encode in the YAML file because it requires creating a ClientTimeout data structure
using aiohttp
library...
You can modify the timeout as follows:
In [46]: import aiohttp
In [49]: timeout = aiohttp.ClientTimeout(total=1000) # The maximal number of seconds
In [57]: storage_options={'http': {'client_kwargs': {'timeout': timeout}}, 'simplecache': {'same_names': True, 'cache_storage': "{{ env(INTAKE_LOCAL_CACHE_DIR) }}"}}
Once you have the storage_options
dict, you can invoke the to_dask()
method as follows:
ds = cat.fgapo_CarbonScope_apo99_v2020(storage_options=storage_options).to_dask()
Matt Long (Sep 22 2021 at 18:18):
Thanks @Anderson Banihirwe!
It seems like you should be able to do something like:
storage_options:
http:
client_kwargs:
timeout: 3600
but indeed, that doesn't work (AttributeError: 'int' object has no attribute 'total'
) like you said it wouldn't.
Last updated: Jan 30 2022 at 12:01 UTC