Source code for pyconform.flownodes

"""
Data Flow Node Classes and Functions

This module contains the classes and functions needed to define nodes in Data Flows.

Copyright 2017-2020, University Corporation for Atmospheric Research
LICENSE: See the LICENSE.rst file for details
"""

from collections import OrderedDict
from datetime import datetime
from os import makedirs, rename
from os.path import dirname, exists
from warnings import warn

import numpy
from cf_units import Unit, num2date
from netCDF4 import Dataset

from pyconform.datasets import FileDesc, VariableDesc
from pyconform.functions import Function
from pyconform.indexing import align_index, index_str, index_tuple, join
from pyconform.physarray import CharArray, PhysArray


[docs]class ValidationWarning(Warning): """Warning for validation errors"""
[docs]class UnitsWarning(Warning): """Warning for units errors"""
[docs]class DateTimeAutoParseWarning(Warning): """Warning for not being able to autoparse new filename based on date-time in the file"""
[docs]def iter_dfs(node): """ Iterate through graph of FlowNodes from a starting node using a Depth-First Search Parameters: node (FlowNode): the starting node from where to begin iterating """ if not isinstance(node, FlowNode): raise TypeError("Can only iterate over FlowNodes") visited = set() tosearch = [node] while tosearch: nd = tosearch.pop() visited.add(nd) if isinstance(nd, FlowNode): tosearch.extend(i for i in nd.inputs if i not in visited) yield nd
[docs]def iter_bfs(node): """ Iterate through graph of FlowNodes from a starting node using a Breadth-First Search Parameters: node (FlowNode): the starting node from where to begin iterating """ if not isinstance(node, FlowNode): raise TypeError("Can only iterate over FlowNodes") visited = set() tosearch = [node] while tosearch: nd = tosearch.pop(0) visited.add(nd) if isinstance(nd, FlowNode): tosearch.extend(i for i in nd.inputs if i not in visited) yield nd
[docs]class FlowNode(object): """ The base class for objects that can appear in a data flow The FlowNode object represents a point in the directed acyclic graph where multiple edges meet. It represents a functional operation on the DataArrays coming into it from its adjacent DataNodes. The FlowNode itself outputs the result of this operation through the __getitem__ interface (i.e., FlowNode[item]), returning a slice of a PhysArray. """ def __init__(self, label, *inputs): """ Initializer Parameters: label: A label to give the FlowNode inputs (list): DataNodes that provide input into this FlowNode """ self._label = label self._inputs = list(inputs) @property def label(self): """The FlowNode's label""" return self._label @property def inputs(self): """Inputs into this FlowNode""" return self._inputs
[docs]class DataNode(FlowNode): """ FlowNode class to create data in memory This is a "source" FlowNode. """ def __init__(self, data): """ Initializer Parameters: data (PhysArray): Data to store in this FlowNode """ # Determine type and upcast, if necessary array = PhysArray(data) if issubclass(array.dtype.type, numpy.float) and array.dtype.itemsize < 8: array = array.astype(numpy.float64) # Store data self._data = array # Call base class initializer super(DataNode, self).__init__(self._data.name) def __getitem__(self, index): """ Compute and retrieve the data associated with this FlowNode operation """ return self._data[index]
[docs]class ReadNode(FlowNode): """ FlowNode class for reading data from a NetCDF file This is a "source" FlowNode. """ def __init__(self, variable, index=slice(None)): """ Initializer Parameters: variable (VariableDesc): A variable descriptor object index (tuple, slice, int, dict): A tuple of slices or ints, or a slice or int, specifying the range of data to read from the file (in file-local indices) """ # Check variable descriptor type and existence in the file if not isinstance(variable, VariableDesc): raise TypeError( "Unrecognized variable descriptor of type " "{!r}: {!r}".format(type(variable), variable.name) ) # Check for associated file if len(variable.files) == 0: raise ValueError( "Variable descriptor {} has no associated files".format(variable.name) ) self._filepath = None for fdesc in variable.files.values(): if fdesc.exists(): self._filepath = fdesc.name break if self._filepath is None: raise OSError( "File path not found for input variable: {!r}".format(variable.name) ) # Check that the variable exists in the file with Dataset(self._filepath, "r") as ncfile: if variable.name not in ncfile.variables: raise OSError( "Variable {!r} not found in NetCDF file: {!r}".format( variable.name, self._filepath ) ) self._variable = variable.name # Check if the index means "all" is_all = False if isinstance(index, slice) and index == slice(None): is_all = True elif isinstance(index, (list, tuple)) and all(i == slice(None) for i in index): is_all = True elif isinstance(index, dict) and all(v == slice(None) for v in index.values()): is_all = True # Store the reading index self._index = index # Call the base class initializer if is_all: label = variable.name else: label = "{}[{}]".format(variable.name, index_str(index)) super(ReadNode, self).__init__(label) def __getitem__(self, index): """ Read PhysArray from file """ with Dataset(self._filepath, "r") as ncfile: # Get a reference to the variable ncvar = ncfile.variables[self._variable] # Get the attributes into a dictionary, for convenience attrs = {a: ncvar.getncattr(a) for a in ncvar.ncattrs()} # Read the variable units units_attr = attrs.get("units", 1) calendar_attr = attrs.get("calendar", None) try: units = Unit(units_attr, calendar=calendar_attr) except ValueError: msg = "Units {!r} unrecognized in UDUNITS. Assuming unitless.".format( units_attr ) warn(msg, UnitsWarning) units = Unit(1) except: raise # Read the original variable dimensions dimensions0 = ncvar.dimensions # Read the original variable shape shape0 = ncvar.shape # Align the read-indices on dimensions index1 = align_index(self._index, dimensions0) # Get the dimensions after application of the first index dimensions1 = tuple( d for d, i in zip(dimensions0, index1) if isinstance(i, slice) ) # Align the second index on the intermediate dimensions index2 = align_index(index, dimensions1) # Get the dimensions after application of the second index dimensions2 = tuple( d for d, i in zip(dimensions1, index2) if isinstance(i, slice) ) # Compute the joined index object index12 = join(shape0, index1, index2) data = ncvar[index12] # Upconvert, if possible if issubclass(ncvar.dtype.type, numpy.float) and ncvar.dtype.itemsize < 8: data = data.astype(numpy.float64) # Read the positive attribute, if available pos = attrs.get("positive", None) return PhysArray( data, name=self.label, units=units, dimensions=dimensions2, positive=pos )
[docs]class EvalNode(FlowNode): """ FlowNode class for evaluating a function on input from neighboring DataNodes The EvalNode is constructed with a function reference and any number of arguments to that function. The number of arguments supplied must match the number of arguments accepted by the function. The arguments can be any type, and the order of the arguments will be preserved in the call signature of the function. If the arguments are of type FlowNode, then a reference to the FlowNode will be stored. If the arguments are of any other type, the argument will be stored by the EvalNode. This is a "non-source"/"non-sink" FlowNode. """ def __init__(self, label, func, *args, **kwds): """ Initializer Parameters: label: A label to give the FlowNode func (class): A Function class args (list): Arguments to the function given by 'func' kwds (dict): Keyword arguments to the function given by 'func' """ # Initialize the function object self._function = func(*args, **kwds) # Include all references as input allargs = tuple(args) + tuple(kwds[k] for k in kwds) # Call the base class initialization super(EvalNode, self).__init__(label, *allargs) @property def sumlike_dimensions(self): """ Return the set of sum-like dimensions registered by the node's function """ if isinstance(self._function, Function): return self._function.sumlike_dimensions else: return set() def __getitem__(self, index): """ Compute and retrieve the data associated with this FlowNode operation """ return self._function[index]
[docs]class MapNode(FlowNode): """ FlowNode class to map input data from a neighboring FlowNode to new dimension names and units The MapNode can rename the dimensions of a FlowNode's output data. It does not change the data itself, however. The input dimension names will be changed according to the dimension map given. If an input dimension name is not referenced by the map, then the input dimension name does not change. This is a "non-source"/"non-sink" FlowNode. """ def __init__(self, label, dnode, dmap={}): """ Initializer Parameters: label: The label given to the FlowNode dnode (FlowNode): FlowNode that provides input into this FlowNode dmap (dict): A dictionary mapping dimension names of the input data to new dimensions names for the output variable """ # Check FlowNode type if not isinstance(dnode, FlowNode): raise TypeError("MapNode can only act on output from another FlowNode") # Check dimension map type if not isinstance(dmap, dict): raise TypeError("Dimension map must be a dictionary") # Store the dimension input-to-output map self._i2omap = dmap # Construct the reverse mapping self._o2imap = dict((v, k) for k, v in dmap.items()) # Call base class initializer super(MapNode, self).__init__(label, dnode) def __getitem__(self, index): """ Compute and retrieve the data associated with this FlowNode operation """ # Request the input information without pulling data inp_info = self.inputs[0][None] # Get the input data dimensions inp_dims = inp_info.dimensions # The input/output dimensions will be the same # OR should be contained in the input-to-output dimension map out_dims = tuple(self._i2omap.get(d, d) for d in inp_dims) # Compute the input index in terms of input dimensions if index is None: inp_index = None elif isinstance(index, dict): inp_index = dict((self._o2imap.get(d, d), i) for d, i in index.items()) else: out_index = index_tuple(index, len(inp_dims)) inp_index = dict( (self._o2imap.get(d, d), i) for d, i in zip(out_dims, out_index) ) # Return the mapped data idims_str = ",".join(inp_dims) odims_str = ",".join(out_dims) if inp_dims == out_dims: name = inp_info.name else: name = "map({}, from=[{}], to=[{}])".format( inp_info.name, idims_str, odims_str ) return PhysArray(self.inputs[0][inp_index], name=name, dimensions=out_dims)
[docs]class ValidateNode(FlowNode): """ FlowNode class to validate input data from a neighboring FlowNode The ValidateNode takes additional attributes in its initializer that can effect the behavior of its __getitem__ method. The special attributes are: 'valid_min': The minimum value the data should have, if valid 'valid_max': The maximum value the data should have, if valid 'min_mean_abs': The minimum acceptable value of the mean of the absolute value of the data 'max_mean_abs': The maximum acceptable value of the mean of the absolute value of the data If these attributes are supplied to the ValidateNode at construction time, then the associated validation checks will be made on the data when __getitem__ is called. Additional attributes may be added to the ValidateNode that do not affect functionality. These attributes may be named however the user wishes and can be retrieved from the FlowNode as a dictionary with the 'attributes' property. This is a "non-source"/"non-sink" FlowNode. """ def __init__(self, vdesc, dnode): """ Initializer Parameters: vdesc (VariableDesc): A variable descriptor object for the output variable dnode (FlowNode): FlowNode that provides input into this FlowNode """ # Check Types if not isinstance(vdesc, VariableDesc): raise TypeError("ValidateNode requires a VariableDesc object as input") if not isinstance(dnode, FlowNode): raise TypeError("ValidateNode can only act on output from another FlowNode") # Call base class initializer super(ValidateNode, self).__init__(vdesc.name, dnode) # Save the variable descriptor object self._vdesc = vdesc # Initialize the history attribute, if necessary info = dnode[None] if "history" not in self.attributes: self.attributes["history"] = info.name # Else inherit the units and calendar of the input data stream, if # necessary if "units" in self.attributes: if info.units.is_time_reference(): ustr, rstr = [c.strip() for c in str(info.units).split("since")] if self._vdesc.units() is not None: ustr = self._vdesc.units() if self._vdesc.refdatetime() is not None: rstr = self._vdesc.refdatetime() self.attributes["units"] = "{} since {}".format(ustr, rstr) # Calendars must match as convertion between different # calendars will fail if self._vdesc.calendar() is None and info.units.calendar is not None: self.attributes["calendar"] = info.units.calendar else: if self._vdesc.units() is None: self.attributes["units"] = str(info.units) @property def attributes(self): """ Attributes dictionary of the variable returned by the ValidateNode """ return self._vdesc.attributes @property def dimensions(self): """ Dimensions tuple of the variable returned by the ValidateNode """ return tuple(self._vdesc.dimensions.keys()) def __getitem__(self, index): """ Compute and retrieve the data associated with this FlowNode operation """ # Get the data to validate indata = self.inputs[0][index] # Check datatype, and cast as necessary if self._vdesc.dtype is None: odtype = indata.dtype else: odtype = self._vdesc.dtype if numpy.can_cast(indata.dtype, odtype, casting="same_kind"): indata = indata.astype(odtype) else: raise TypeError( ("Cannot cast datatype {!s} to {!s} in ValidateNode " "{!r}").format( indata.dtype, odtype, self.label ) ) # Check that units match as expected, otherwise convert if "units" in self.attributes: ounits = Unit( self.attributes["units"], calendar=self.attributes.get("calendar", None) ) if ounits != indata.units: if index is None: indata.units = ounits else: try: indata = indata.convert(ounits) except Exception as err: err_msg = "When validating output variable {}: {}".format( self.label, err ) raise err.__class__(err_msg) # Check that the dimensions match as expected if self.dimensions != indata.dimensions: indata = indata.transpose(self.dimensions) # Check the positive attribute, if specified positive = self.attributes.get("positive", None) if positive is not None and indata.positive != positive: indata.flip() # Do not validate if index is None (nothing to validate) if index is None: return indata # Testing parameters valid_min = self.attributes.get("valid_min", None) valid_max = self.attributes.get("valid_max", None) ok_min_mean_abs = self.attributes.get("ok_min_mean_abs", None) ok_max_mean_abs = self.attributes.get("ok_max_mean_abs", None) # Validate minimum if valid_min: dmin = numpy.min(indata) if dmin < valid_min: msg = "valid_min: {} < {} ({!r})".format(dmin, valid_min, self.label) warn(msg, ValidationWarning) indata = numpy.ma.masked_where(indata <= valid_min, indata) # Validate maximum if valid_max: dmax = numpy.max(indata) if dmax > valid_max: msg = "valid_max: {} > {} ({!r})".format(dmax, valid_max, self.label) warn(msg, ValidationWarning) indata = numpy.ma.masked_where(indata >= valid_max, indata) # Compute mean of the absolute value, if necessary if ok_min_mean_abs or ok_max_mean_abs: mean_abs = numpy.mean(numpy.abs(indata)) # Validate minimum mean abs if ok_min_mean_abs: if mean_abs < ok_min_mean_abs: msg = "ok_min_mean_abs: {} < {} ({!r})".format( mean_abs, ok_min_mean_abs, self.label ) warn(msg, ValidationWarning) # Validate maximum mean abs if ok_max_mean_abs: if mean_abs > ok_max_mean_abs: msg = "ok_max_mean_abs: {} > {} ({!r})".format( mean_abs, ok_max_mean_abs, self.label ) warn(msg, ValidationWarning) return indata
[docs]class WriteNode(FlowNode): """ FlowNode that writes validated data to a file. This is a "sink" node, meaning that the __getitem__ (i.e., [index]) interface does not return anything. Rather, the data "retrieved" through the __getitem__ interface is sent directly to file. For this reason, it is possible to "retrieve" data multiple times, resulting in writing and overwriting of data. To eliminate this inefficiency, it is advised that you use the 'execute' method to write data efficiently once (and only once). """ def __init__(self, filedesc, inputs=()): """ Initializer Parameters: filedesc (FileDesc): File descriptor for the file to write inputs (tuple): A tuple of ValidateNodes providing input into the file history (bool): Whether to write a history attribute generated during execution for each variable in the file """ # Check filename if not isinstance(filedesc, FileDesc): raise TypeError("File descriptor must be of FileDesc type") # Check and store input variables nodes if not isinstance(inputs, (list, tuple)): raise TypeError( ("WriteNode {!r} inputs must be given as a list or " "tuple").format( filedesc.name ) ) for inp in inputs: if not isinstance(inp, ValidateNode): raise TypeError( ( "WriteNode {!r} cannot accept input from type {}, must be a " "ValidateNode" ).format(filedesc.name, type(inp)) ) # Extract hidden variables (names starting with '_') from list of input nodes hidden_labels = [inp.label for inp in inputs if inp.label[0] == "_"] self._hidden_inputs = [inp for inp in inputs if inp.label in hidden_labels] inputs = [inp for inp in inputs if inp.label not in hidden_labels] # Call base class (label is filename) super(WriteNode, self).__init__(filedesc.name, *inputs) # Store the file descriptor for use later self._filedesc = filedesc # Check that inputs are contained in the file descriptor for inp in inputs: if inp.label not in self._filedesc.variables: raise ValueError( ( "WriteNode {!r} takes input from variable {!r} that is not " "contained in the described file" ).format(filedesc.name, inp.label) ) # Construct the proper filename fname = self._autoparse_filename_(self.label) self._label = fname self._filedesc._name = fname self._tmp_ext = ".tmp.nc" # Set the filehandle self._file = None # Initialize set of inverted dimensions self._idims = set() # Initialize set of unwritten attributes self._unwritten_attributes = {"_FillValue", "direction", "history"} def _autoparse_filename_(self, fname): """ Determine if autoparsing the filename needs to be done Parameters: fname (str): The original name of the file Returns: str: The new name for the file """ if "{" in fname: possible_tvars = [] possible_inputs = list(self.inputs) if self._filedesc.autoparse_time_variable: possible_tvars.append(self._filedesc.autoparse_time_variable) possible_inputs += self._hidden_inputs else: for var in self._filedesc.variables: vdesc = self._filedesc.variables[var] if var in ("time", "time1", "time2", "time3"): possible_tvars.append(var) elif ( vdesc.cfunits().is_time_reference() and len(vdesc.dimensions) == 1 ): possible_tvars.append(var) elif ( "standard_name" in vdesc.attributes and vdesc.attributes["standard_name"] == "time" ): possible_tvars.append(var) elif "axis" in vdesc.attributes and vdesc.attributes["axis"] == "T": possible_tvars.append(var) if len(possible_tvars) == 0: msg = "Could not identify a time variable to autoparse filename {!r}".format( fname ) warn(msg, DateTimeAutoParseWarning) return fname possible_tnodes = { vnode.label: vnode for vnode in possible_inputs if vnode.label in possible_tvars } if len(possible_tnodes) == 0: raise ValueError( "Time variable input missing for file {!r}".format(fname) ) tnode = ( possible_tnodes["time"] if "time" in possible_tnodes else tuple(possible_tnodes.values())[0] ) t1 = tnode[0:1] t2 = tnode[-1:] while "{" in fname: beg = fname.find("{") end = fname.find("}", beg) if end == -1: raise ValueError( "Filename {!r} has unbalanced special characters".format(fname) ) prefix = fname[:beg] fmtstr1, fmtstr2 = fname[beg + 1 : end].split("-") suffix = fname[end + 1 :] datestr1 = ( num2date(t1.data[0], str(t1.units), t1.units.calendar) .strftime(fmtstr1) .replace(" ", "0") ) datestr2 = ( num2date(t2.data[0], str(t2.units), t2.units.calendar) .strftime(fmtstr2) .replace(" ", "0") ) fname = "{}{}-{}{}".format(prefix, datestr1, datestr2, suffix) return fname
[docs] def enable_history(self): """ Enable writing of the history attribute to the file """ if "history" in self._unwritten_attributes: self._unwritten_attributes.remove("history")
[docs] def disable_history(self): """ Disable writing of the history attribute to the file """ self._unwritten_attributes.add("history")
def _open_(self, deflate=None): """ Open the file for writing, if not open already """ if self._file is None: # Make the necessary subdirectories to open the file fname = self.label tmp_fname = "{}{}".format(fname, self._tmp_ext) fdir = dirname(fname) fmt = self._filedesc.format if len(fdir) > 0 and not exists(fdir): try: makedirs(fdir) except: if exists(fdir): print( "Already created directory for output file {!r}".format( fname ) ) else: raise IOError( "Failed to create directory for output file {!r}".format( fname ) ) # Try to open the output file for writing try: self._file = Dataset(tmp_fname, "w", format=fmt) except: raise IOError("Failed to open output file {!r}".format(fname)) # Write the global attributes self._filedesc.attributes["creation_date"] = datetime.utcnow().strftime( "%Y-%m-%dT%H:%M:%SZ" ) self._file.setncatts(self._filedesc.attributes) # Scan over variables for coordinates and dimension information req_dims = set() for vnode in self.inputs: vname = vnode.label vdesc = self._filedesc.variables[vname] # Get only dimension descriptors needed by the variables for dname in vdesc.dimensions: if dname not in self._filedesc.dimensions: raise KeyError( ( "Dimension {!r} needed by variable {!r} is not specified " "in file {!r}" ).format(dname, vname, fname) ) req_dims.add(dname) # Determine coordinates and dimensions to invert if len(vdesc.dimensions) == 1 and "axis" in vnode.attributes: if "direction" in vnode.attributes: vdir_out = vnode.attributes["direction"] if vdir_out not in ["increasing", "decreasing"]: raise ValueError( ( "Unrecognized direction in output coordinate variable " "{!r} when writing file {!r}" ).format(vname, fname) ) vdir_inp = WriteNode._direction_(vnode[:]) if vdir_inp is None: raise ValueError( ( "Output coordinate variable {!r} has no calculable " "direction" ).format(vname) ) if vdir_inp != vdir_out: self._idims.add(tuple(vdesc.dimensions.keys())[0]) # Create the required dimensions in the file for dname in req_dims: ddesc = self._filedesc.dimensions[dname] if not ddesc.is_set(): raise RuntimeError( ("Cannot create unset dimension {!r} in file " "{!r}").format( dname, fname ) ) if ddesc.unlimited: self._file.createDimension(dname) else: self._file.createDimension(dname, ddesc.size) # Create the variables and write their attributes for vnode in self.inputs: vname = vnode.label vdesc = self._filedesc.variables[vname] vattrs = OrderedDict((k, v) for k, v in vnode.attributes.items()) vdtype = vdesc.dtype fillval = vattrs.get("_FillValue", None) vdims = tuple(vdesc.dimensions.keys()) if deflate is None: zlib = self._filedesc.deflate > 0 clev = self._filedesc.deflate if zlib else 1 else: if not isinstance(deflate, int): raise TypeError("Override deflate value must be an integer") if deflate < 0 or deflate > 9: raise TypeError("Override deflate value range from 0 to 9") zlib = deflate > 0 clev = deflate if zlib else 1 ncvar = self._file.createVariable( vname, vdtype, vdims, fill_value=fillval, zlib=zlib, complevel=clev ) for aname in vattrs: if aname not in self._unwritten_attributes: avalue = vattrs[aname] if aname == "history": idimstr = ",".join( d for d in vdesc.dimensions if d in self._idims ) if len(idimstr) > 0: avalue = "invdims({}, dims=[{}])".format( avalue, idimstr ) ncvar.setncattr(aname, avalue) def _close_(self): """ Close the file associated with the WriteNode """ if self._file is not None: self._file.close() self._idims = set() self._file = None tmp_fname = "{}{}".format(self.label, self._tmp_ext) if exists(tmp_fname): rename(tmp_fname, self.label) @staticmethod def _chunk_iter_(dsizes, chunks={}): if not isinstance(dsizes, OrderedDict): raise TypeError( "Dimensions must be an ordered dictionary of names and sizes" ) if not isinstance(chunks, dict): raise TypeError("Dimension chunks must be a dictionary") chunks_ = {d: chunks[d] if d in chunks else dsizes[d] for d in dsizes} nchunks = { d: int(dsizes[d] // chunks_[d]) + int(dsizes[d] % chunks_[d] > 0) for d in dsizes } ntotal = int(numpy.prod([nchunks[d] for d in nchunks])) idx = {d: 0 for d in dsizes} for n in range(ntotal): for d in nchunks: n, idx[d] = divmod(n, nchunks[d]) chunk = OrderedDict() for d in dsizes: lb = idx[d] * chunks_[d] ub = (idx[d] + 1) * chunks_[d] chunk[d] = slice(lb, ub if ub < dsizes[d] else None) yield chunk @staticmethod def _invert_dims_(dsizes, chunk, idims=set()): if not isinstance(dsizes, OrderedDict): raise TypeError( "Dimensions must be an ordered dictionary of names and sizes" ) if not isinstance(chunk, OrderedDict): raise TypeError("Chunk must be an ordered dictionary of names and slices") if not isinstance(idims, set): raise TypeError("Dimensions to invert must be a set") ichunk = OrderedDict() for d in dsizes: s = dsizes[d] c = chunk[d] if d in idims: ub = s if c.stop is None else c.stop ichunk[d] = slice(s - c.start - 1, s - ub - 1 if ub < s else None, -1) else: ichunk[d] = c return ichunk @staticmethod def _direction_(data): diff = numpy.diff(data) if numpy.all(diff > 0): return "increasing" elif numpy.all(diff < 0): return "decreasing" else: return None
[docs] def execute(self, chunks={}, deflate=None): """ Execute the writing of the WriteNode file at once This method efficiently writes all of the data for each file only once, chunking the data according to the 'chunks' parameter, as needed. Parameters: chunks (dict): A dictionary of output dimension names and chunk sizes for each dimension given. Output dimensions not included in the dictionary will not be chunked. (Use OrderedDict to preserve order of dimensions, where the first dimension will be assumed to correspond to the fastest-varying index and the last dimension will be assumed to correspond to the slowest-varying index.) deflate (int): Override the output file deflate level with given value """ # Open the file and write the header information self._open_(deflate=deflate) # Create data structure to keep track of which variable chunks we have written vchunks = {vnode.label: set() for vnode in self.inputs} # Compute the Global Dimension Sizes dictionary from the input variable nodes inputdims = [] for vnode in self.inputs: for d in self._filedesc.variables[vnode.label].dimensions: if d not in inputdims: inputdims.append(d) gdims = OrderedDict((d, self._filedesc.dimensions[d].size) for d in inputdims) # Iterate over the global dimension space for chunk in WriteNode._chunk_iter_(gdims, chunks=chunks): # Invert the necessary dimensions to get the read-chunk rchunk = self._invert_dims_(gdims, chunk, idims=self._idims) # Loop over all variables and write the data, if necessary for vnode in self.inputs: vname = vnode.label vdesc = self._filedesc.variables[vname] ncvar = self._file.variables[vname] # Compute the write-chunk for the given variable wchunk = tuple(chunk[d] for d in vdesc.dimensions) # Write the data to the variable, if it hasn't already been # written if repr(wchunk) not in vchunks[vname]: vdata = vnode[rchunk] if isinstance(vdata, CharArray): vdata = vdata.stretch(ncvar.shape[-1]) ncvar[wchunk] = vdata vchunks[vname].add(repr(wchunk)) # Close the file after completion self._close_()