Stream: python-questions

Topic: KilledWorker on simple operations


view this post on Zulip Riley Brady (Jun 09 2020 at 18:20):

Hi all. Generally dask plays pretty nicely with my work if I throw a lot of workers at the problem. I'm struggling with a KilledWorker error right now in a pretty simple case. I have a 32.4GB dataset and I am taking a mean over a single dimension to reduce it to ~6.5GB dataset. I tried with 36 cores per node, 36 processes, 100GB memory, and 10 nodes (i.e. 1TB memory and 360 workers). It's chunked such that chunks are about 100MB. Just trying to take the mean over a single dimension and then saving to a netcdf I get

KilledWorker: ("('open_dataset-concatenate-c100c09070937c41c0bc3dec0d93593c', 0, 5, 0, 0, 0)", <Worker 'tcp://10.148.7.66:46303', name: 0-0, memory: 0, processing: 1>)

Any thoughts? I tried 18 processes per node instead of 36 to give the workers a little more memory. I've also tried to chunk smaller and larger. Again, generally more workers/nodes solves these kinds of things but these are 100MB chunks with 360 workers for a 32GB dataset. Shouldn't expect this problem.

view this post on Zulip Deepak Cherian (Jun 09 2020 at 18:24):

does ds.mean("dim").compute() also crash?

view this post on Zulip Deepak Cherian (Jun 09 2020 at 18:25):

and is the 32.4GB dataset being read straight from disk, or is the result of some other step?

view this post on Zulip Riley Brady (Jun 09 2020 at 18:26):

@Deepak Cherian , yep it does. I switched over to .to_netcdf() thinking that might help for some reason. It's read semi straight from disk. I run xr.open_mfdataset first to concatenate 7 ~4.5GB files into the larger one. Maybe that's what the concat error is. I might try doing the open_mfdataset and then saving the combined form out as zarr.

view this post on Zulip Deepak Cherian (Jun 09 2020 at 18:27):

are you chunking at the mfdataset stage?

view this post on Zulip Riley Brady (Jun 09 2020 at 18:31):

I think that fixes it. :) I was running xr.open_mfdataset(files, parallel=True) then chunking afterwards with ds = ds.chunk(...). I didn't stop to think that it was probably loading it all in as one chunk and then rechunking. It works if I run chunks=... within the open_mfdataset stage. Thanks! Sometimes it's simple.


Last updated: Jan 30 2022 at 12:01 UTC