Stream: dask

Topic: saving to zarr results in error


view this post on Zulip Mira Berdahl (Jan 26 2022 at 23:52):

Hi,

I'm trying to save a dataArray to a zarr file, and have been able to do this in the past with similar code. However, lately I've run into this issue which I cannot seem to resolve.

Say my DataArray looks like this:

aw_oceanT_global
xarray.DataArraytime: 3600 z_t: 60
Array Chunk
Bytes 1.65 MiB 480 B
Shape (3600, 60) (1, 60)
Count 154875 Tasks 3600 Chunks
Type float64 numpy.ndarray

I do :

aw_oceanT_global.unify_chunks()
aw_oceanT_global.to_dataset(name='TEMP').chunk({'z_t':-1}).to_zarr('/glade/scratch/mberdahl/127kaH11/TEMP/aw_oceanT_global_timeseries_H11.zarr', mode='w')

But receive the following error (just the top bit of it):
distributed.utils - ERROR - 'str' object has no attribute 'text'
Traceback (most recent call last):
File "/glade/work/mberdahl/miniconda/envs/pangeo/lib/python3.9/site-packages/distributed/utils.py", line 681, in log_errors
yield
File "/glade/work/mberdahl/miniconda/envs/pangeo/lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 346, in update
self.root.title.text = title
AttributeError: 'str' object has no attribute 'text'
distributed.utils - ERROR - 'str' object has no attribute 'text'
Traceback (most recent call last):
File "/glade/work/mberdahl/miniconda/envs/pangeo/lib/python3.9/site-packages/distributed/utils.py", line 681, in log_errors
yield
File "/glade/work/mberdahl/miniconda/envs/pangeo/lib/python3.9/site-packages/distributed/dashboard/components/scheduler.py", line 3417, in status_doc
cluster_memory.update()

Looks like I'm running into memory issues maybe?bBut this DataArray isn't any bigger than other arrays I've had success with with the same methods. Any thoughts appreciated!

view this post on Zulip Anderson Banihirwe (Jan 27 2022 at 00:25):

@Mira Berdahl, I don't know why this is failing by looking at the traceback. Can you provide a reproducible test case or a pointer to the notebook you're using?

view this post on Zulip Mira Berdahl (Jan 27 2022 at 02:51):

@Anderson Banihirwe , the path to the notebook is:
/glade/u/home/mberdahl/FirstLook_LIGruns/DASK_scripts/TryAreaWeightingAvg.ipynb
Thanks for taking a look!

view this post on Zulip Anderson Banihirwe (Jan 31 2022 at 17:29):

Mira Berdahl said:

Anderson Banihirwe , the path to the notebook is:
/glade/u/home/mberdahl/FirstLook_LIGruns/DASK_scripts/TryAreaWeightingAvg.ipynb
Thanks for taking a look!

Thanks, @Mira Berdahl! Unfortunately, I haven't been able to fix the issue (I am getting a KilledWorkerError when using the full dataset). In the meantime, I suggest trying the following:

1) Reduce the data size you're loading (e.g. load 1/4 of it)

mfds = xr.open_mfdataset(dfiles[:len(dfiles)//4], combine='by_coords', parallel=True , chunks={'time': 6}, data_vars=['TEMP', 'time_bound'], decode_times=False)

2) Ensure the computations are done in single precision (float32) so as to avoid memory usage blowing up:

def weighted_temporal_mean(ds, var, dims=('nlat', 'nlon')):
    """
    weight by days in each month
    """
    #time_bound_diff = ds.time_bnds.diff(dim="nbnds")[:, 0]
    month_length = ds.time.dt.days_in_month
    wgts = (month_length.groupby("time.year") / month_length.groupby("time.year").sum()).astype('float32')
    #wgts = time_bound_diff.groupby("time.year") / time_bound_diff.groupby(
    #    "time.year"
    #).sum(xr.ALL_DIMS)
    np.testing.assert_allclose(wgts.groupby("time.year").sum(xr.ALL_DIMS), 1.0)
    obs = ds[var]
    cond = obs.isnull()
    ones = xr.where(cond, 0.0, 1.0).astype('float32')
    obs_sum = (obs * wgts).resample(time="AS").sum(dim="time")
    ones_out = (ones * wgts).resample(time="AS").sum(dim="time")
    #obs_s = (obs_sum / ones_out).mean(dims).to_series()
    return obs_sum / ones_out

Notice the .astype('float32') in weighted_temporal_mean function and in the following line:

areacello = grid.TAREA.where(grid.KMT>0).astype('float32')

3) The resultant dataset when computing the following is so tiny (~10MB) that I recommend avoiding the re-chunking and loading everything in memory before writing to disk (in netCDF)

aw_oceanT_global = (Annual_oceanT_H11 * areacello).sum(['nlat','nlon'])/areacello.sum()
aw_oceanT_global.load()
aw_oceanT_global.to_dataset(name='TEMP').to_netcdf("............")

view this post on Zulip Anderson Banihirwe (Jan 31 2022 at 17:34):

I am planning to tinker with this later today to see if I can get it to work on the full dataset. I'll keep you posted...

view this post on Zulip Mira Berdahl (Feb 02 2022 at 22:24):

Hi @Anderson Banihirwe
Sorry for the delay on my end (childcare center closures have derailed me). Anyway, thanks for these suggestions. I'm able to get these methods to work while loading half the dataset. Any reason why you recommend saving as netcdf over zarr in this case?
I'll look out for any other suggestions you have for getting this working for the full dataset. Thanks!

view this post on Zulip Mira Berdahl (Feb 02 2022 at 22:56):

Mira Berdahl said:

Hi Anderson Banihirwe
Sorry for the delay on my end (childcare center closures have derailed me). Anyway, thanks for these suggestions. I'm able to get these methods to work while loading half the dataset. Any reason why you recommend saving as netcdf over zarr in this case?
I'll look out for any other suggestions you have for getting this working for the full dataset. Thanks!

Actually I take part of this back. I can get my notebook to work for half my dataset if I stick to saving to zarr. The process of loading the area weighted mean and then saving to nc fails with a killedWorker error. So seems like half is still too big for that method.

view this post on Zulip Anderson Banihirwe (Feb 02 2022 at 23:05):

Sorry for the delay on my end (childcare center closures have derailed me). Anyway, thanks for these suggestions.

No worries...

Actually I take part of this back. I can get my notebook to work for half my dataset if I stick to saving to zarr. The process of loading the area weighted mean and then saving to nc fails with a killedWorker error. So seems like half is still too big for that method.

Interesting :thinking: So mysterious... Is your notebook still active? If so, could you send me the output of the following commands:

import dask
print(dask.config.get("jobqueue.pbs.log-directory"))

view this post on Zulip Mira Berdahl (Feb 02 2022 at 23:12):

Sure thing, it gives back the following path:
/glade/scratch/mberdahl/dask/casper-dav/logs

Also having issues with getting enough workers, I request 12, but only ever receive 4 despite waiting a few hrs...

view this post on Zulip Anderson Banihirwe (Feb 02 2022 at 23:15):

Also having issues with getting enough workers, I request 12, but only ever receive 4 despite waiting a few hrs...

I believe this is a CASPER issue (not sure but it appears to have a bunch of pending jobs in the queue)

view this post on Zulip Anderson Banihirwe (Feb 02 2022 at 23:16):

What's the output of

print(dask.config.get("distributed"))

view this post on Zulip Mira Berdahl (Feb 02 2022 at 23:17):

Anderson Banihirwe said:

What's the output of

print(dask.config.get("distributed"))

This is what I see when I run that:
{'scheduler': {'bandwidth': 1000000000, 'allowed-failures': 3, 'blocked-handlers': [], 'default-data-size': '1kiB', 'events-cleanup-delay': '1h', 'idle-timeout': None, 'transition-log-length': 100000, 'events-log-length': 100000, 'work-stealing': True, 'work-stealing-interval': '100ms', 'worker-ttl': None, 'pickle': True, 'preload': [], 'preload-argv': [], 'unknown-task-duration': '500ms', 'default-task-durations': {'rechunk-split': '1us', 'split-shuffle': '1us'}, 'validate': False, 'dashboard': {'status': {'task-stream-length': 1000}, 'tasks': {'task-stream-length': 100000}, 'tls': {'ca-file': None, 'key': None, 'cert': None}, 'bokeh-application': {'allow_websocket_origin': ['*'], 'keep_alive_milliseconds': 500, 'check_unused_sessions_milliseconds': 500}}, 'locks': {'lease-validation-interval': '10s', 'lease-timeout': '30s'}, 'http': {'routes': ['distributed.http.scheduler.prometheus', 'distributed.http.scheduler.info', 'distributed.http.scheduler.json', 'distributed.http.health', 'distributed.http.proxy', 'distributed.http.statics']}, 'allowed-imports': ['dask', 'distributed'], 'active-memory-manager': {'start': False, 'interval': '2s', 'policies': [{'class': 'distributed.active_memory_manager.ReduceReplicas'}]}}, 'worker': {'memory': {'target': 0.9, 'spill': False, 'pause': 0.8, 'terminate': 0.95, 'recent-to-old-time': '30s', 'rebalance': {'measure': 'optimistic', 'sender-min': 0.3, 'recipient-max': 0.6, 'sender-recipient-gap': 0.1}}, 'blocked-handlers': [], 'multiprocessing-method': 'spawn', 'use-file-locking': True, 'connections': {'outgoing': 50, 'incoming': 10}, 'preload': [], 'preload-argv': [], 'daemon': True, 'validate': False, 'resources': {}, 'lifetime': {'duration': None, 'stagger': '0 seconds', 'restart': False}, 'profile': {'interval': '10ms', 'cycle': '1000ms', 'low-level': False}, 'http': {'routes': ['distributed.http.worker.prometheus', 'distributed.http.health', 'distributed.http.statics']}}, 'comm': {'compression': None, 'retry': {'count': 0, 'delay': {'min': '1s', 'max': '20s'}}, 'shard': '64MiB', 'offload': '10MiB', 'default-scheme': 'tcp', 'socket-backlog': 2048, 'recent-messages-log-length': 0, 'ucx': {'cuda-copy': None, 'tcp': None, 'nvlink': None, 'infiniband': None, 'rdmacm': None, 'net-devices': None, 'reuse-endpoints': None, 'create-cuda-context': None}, 'zstd': {'level': 3, 'threads': 0}, 'timeouts': {'connect': '30s', 'tcp': '30s'}, 'require-encryption': None, 'tls': {'ciphers': None, 'min-version': 1.2, 'max-version': None, 'ca-file': None, 'scheduler': {'cert': None, 'key': None}, 'worker': {'key': None, 'cert': None}, 'client': {'key': None, 'cert': None}}, 'tcp': {'backend': 'tornado'}, 'websockets': {'shard': '8MiB'}}, 'dashboard': {'link': 'https://jupyterhub.hpc.ucar.edu/stable/user/{USER}/FirstTry/proxy/{port}/status', 'export-tool': False, 'graph-max-items': 5000, 'prometheus': {'namespace': 'dask'}}, 'version': 2, 'nanny': {'preload': [], 'preload-argv': [], 'environ': {'MALLOC_TRIM_THRESHOLD_': 65536, 'OMP_NUM_THREADS': 1, 'MKL_NUM_THREADS': 1}}, 'client': {'heartbeat': '5s', 'scheduler-info-interval': '2s'}, 'deploy': {'lost-worker-timeout': '15s', 'cluster-repr-interval': '500ms'}, 'adaptive': {'interval': '1s', 'target-duration': '5s', 'minimum': 0, 'maximum': inf, 'wait-count': 3}, 'diagnostics': {'nvml': True, 'computations': {'max-history': 100, 'ignore-modules': ['distributed', 'dask', 'xarray', 'cudf', 'cuml', 'prefect', 'xgboost']}}, 'admin': {'tick': {'interval': '20ms', 'limit': '3s'}, 'max-error-length': 10000, 'log-length': 10000, 'log-format': '%(name)s - %(levelname)s - %(message)s', 'pdb-on-err': False, 'system-monitor': {'interval': '500ms'}, 'event-loop': 'tornado'}, 'rmm': {'pool-size': None}}

view this post on Zulip Anderson Banihirwe (Feb 02 2022 at 23:20):

I wasn't expecting it to return this much information :smile:. Try the following

dask.config.get("distributed.worker.memory")

view this post on Zulip Mira Berdahl (Feb 02 2022 at 23:21):

Anderson Banihirwe said:

I wasn't expecting it to return this much information :). Try the following

dask.config.get("distributed.worker.memory")

haha, ok
this looks more reasonable:
{'target': 0.9,
'spill': False,
'pause': 0.8,
'terminate': 0.95,
'recent-to-old-time': '30s',
'rebalance': {'measure': 'optimistic',
'sender-min': 0.3,
'recipient-max': 0.6,
'sender-recipient-gap': 0.1}}

view this post on Zulip Anderson Banihirwe (Feb 02 2022 at 23:31):

\I was looking for the spill to disk setting and It appears to be off: 'spill': False, ( this is the recommended option).

From one of your worker logs, I am seeing that your workers are getting killed by PBS for exceeding the wall time...

=>> PBS: job killed: walltime 7257 exceeded limit 7200
distributed.dask_worker - INFO - Exiting on signal 15
distributed.nanny - INFO - Closing Nanny at 'tcp://10.12.206.65:46025'
distributed.nanny - INFO - Closing Nanny at 'tcp://10.12.206.65:39530'
distributed.nanny - INFO - Closing Nanny at 'tcp://10.12.206.65:40172'
distributed.nanny - INFO - Closing Nanny at 'tcp://10.12.206.65:33653'
distributed.nanny - INFO - Worker process 94660 was killed by signal 15
distributed.nanny - INFO - Worker process 94656 was killed by signal 15
distributed.nanny - INFO - Worker process 94663 was killed by signal 15
distributed.nanny - INFO - Worker process 94653 was killed by signal 15
distributed.dask_worker - INFO - End worker

I presume these workers are getting killed while they are still trying to write data to netCDF... Is this actually the case?

If Zarr is working for you, please disregard my suggestion of using netCDF... Xarray + dask can hang forever when writing data to netCDF. I don't know why but I've seen it a few times...

view this post on Zulip Mira Berdahl (Feb 02 2022 at 23:32):

Anderson Banihirwe said:

\I was looking for the spill to disk setting and It appears to be off: 'spill': False, ( this is the recommended option).

From one of your worker logs, I am seeing that your workers are getting killed by PBS for exceeding the wall time...

=>> PBS: job killed: walltime 7257 exceeded limit 7200
distributed.dask_worker - INFO - Exiting on signal 15
distributed.nanny - INFO - Closing Nanny at 'tcp://10.12.206.65:46025'
distributed.nanny - INFO - Closing Nanny at 'tcp://10.12.206.65:39530'
distributed.nanny - INFO - Closing Nanny at 'tcp://10.12.206.65:40172'
distributed.nanny - INFO - Closing Nanny at 'tcp://10.12.206.65:33653'
distributed.nanny - INFO - Worker process 94660 was killed by signal 15
distributed.nanny - INFO - Worker process 94656 was killed by signal 15
distributed.nanny - INFO - Worker process 94663 was killed by signal 15
distributed.nanny - INFO - Worker process 94653 was killed by signal 15
distributed.dask_worker - INFO - End worker

I presume these workers are getting killed while they are still trying to write data to netCDF... Is this actually the case?

If Zarr is working for you, please disregard my suggestion of using netCDF... Xarray + dask can hang forever when writing data to netCDF. I don't know why but I've seen it a few times...

OK I'll stick to zarr for now then.
In any case, I don't think I ever got more than 4 workers to launch before the 2hr walltime was exceeded, so I think whatever is being killed never even had a chance to get going. This has been the case for me for many days, almost a week, now...

view this post on Zulip Anderson Banihirwe (Feb 02 2022 at 23:35):

In any case, I don't think I ever got more than 4 workers to launch before the 2hr walltime was exceeded, so I think whatever is being killed never even had a chance to get going. This has been the case for me for many days, almost a week, now...

Unfortunately, in some cases, the number of dask workers in your cluster matters, and you might not be able to process the data unless you have enough workers with enough resources (memory-wise).

Can you try running your notebook/analysis on Cheyenne instead of Casper when you get a chance?

view this post on Zulip Anderson Banihirwe (Feb 02 2022 at 23:37):

Getting the right resources on Casper can be a headache sometimes especially when so many users are active at the same time...

view this post on Zulip Mira Berdahl (Feb 02 2022 at 23:58):

Anderson Banihirwe said:

Getting the right resources on Casper can be a headache sometimes especially when so many users are active at the same time...

OK, looks like I can get my 12 workers fairly quickly when using Cheyenne, and in so doing, load the entire dataset. Still waiting to see if it can write to zarr or if it crashes.

view this post on Zulip Mira Berdahl (Feb 03 2022 at 00:29):

Mira Berdahl said:

Anderson Banihirwe said:

Getting the right resources on Casper can be a headache sometimes especially when so many users are active at the same time...

OK, looks like I can get my 12 workers fairly quickly when using Cheyenne, and in so doing, load the entire dataset. Still waiting to see if it can write to zarr or if it crashes.

It takes a while, but this runs on Cheyenne now! Many thanks for your ideas and help here @Anderson Banihirwe .

view this post on Zulip Anderson Banihirwe (Feb 03 2022 at 14:39):

Glad to hear it :tada: and you are welcome!


Last updated: May 16 2025 at 17:14 UTC