"""
Data Flow Classes and Functions
This module contains the classes and functions needed to define Data Flows.
A Data Flow is a directed acyclic graph (DAG) that describes the flow of data
from one node in the graph to another. Each node in the flow represents a
data action of some sort, such as reading data from file, transposing data,
unit conversion, addition, subtraction, etc. The data transmitted along the
graph edges is assumed to a Numpy.NDArray-like object.
The action associated with each node is not performed until the data is
"requested" with the __getitem__ interface, via Node[key].
Copyright 2017-2020, University Corporation for Atmospheric Research
LICENSE: See the LICENSE.rst file for details
"""
from warnings import warn
import numpy
from asaptools.partition import WeightBalanced
from asaptools.simplecomm import SimpleComm, create_comm
from pyconform.datasets import DefinitionWarning, InputDatasetDesc, OutputDatasetDesc
from pyconform.flownodes import (
DataNode,
EvalNode,
MapNode,
ReadNode,
ValidateNode,
WriteNode,
iter_dfs,
)
from pyconform.functions import find_function, find_operator
from pyconform.parsing import FuncType, OpType, VarType, parse_definition
from pyconform.physarray import PhysArray
[docs]class VariableNotFoundError(ValueError):
"""Indicate if an input variable could not be found during construction"""
[docs]class DataFlow(object):
"""
An object describing the flow of data from input to output
"""
def __init__(self, inpds, outds):
"""
Initializer
Parameters:
inpds (InputDatasetDesc): The input dataset to use as reference when
parsing variable definitions
outds (OutputDatasetDesc): The output dataset defining the output variables and
their definitions or data
"""
# Input dataset
if not isinstance(inpds, InputDatasetDesc):
raise TypeError("Input dataset must be of InputDatasetDesc type")
self._ids = inpds
# Output dataset
if not isinstance(outds, OutputDatasetDesc):
raise TypeError("Output dataset must be of OutputDatasetDesc type")
self._ods = outds
# Create a dictionary of DataNodes from variables with non-string
# definitions
datnodes = self._create_data_nodes_()
# Create a dictionary to store FlowNodes for variables with string
# definitions
defnodes = self._create_definition_nodes_(datnodes)
# Compute the definition node info objects (zero-sized physarrays)
definfos = self._compute_node_infos_(defnodes)
# Construct the dimension map
self._i2omap, self._o2imap = self._compute_dimension_maps_(definfos)
# Create the map nodes
defnodes = self._create_map_nodes_(defnodes, definfos)
# Create the validate nodes for each valid output variable
self._valnodes = self._create_validate_nodes_(datnodes, defnodes)
# Get the set of all sum-like dimensions (dimensions that cannot be
# broken into chunks)
self._sumlike_dimensions = self._find_sumlike_dimensions_()
# Create the WriteNodes for each time-series output file
self._writenodes = self._create_write_nodes_()
# Compute the bytesizes of each output variable
varsizes = self._compute_variable_sizes_()
# Compute the file sizes for each output file
self._filesizes = self._compute_file_sizes(varsizes)
def _create_data_nodes_(self):
datnodes = {}
for vname in self._ods.variables:
vdesc = self._ods.variables[vname]
if not isinstance(vdesc.definition, str):
if vdesc.datatype == "char":
vdata = numpy.asarray(vdesc.definition, dtype="S")
else:
vdata = numpy.asarray(vdesc.definition, dtype=vdesc.dtype)
vunits = vdesc.cfunits()
vdims = tuple(vdesc.dimensions.keys())
varray = PhysArray(vdata, name=vname, units=vunits, dimensions=vdims)
datnodes[vname] = DataNode(varray)
return datnodes
def _create_definition_nodes_(self, datnodes):
defnodes = {}
for vname in self._ods.variables:
vdesc = self._ods.variables[vname]
if isinstance(vdesc.definition, str):
try:
pdef = parse_definition(vdesc.definition)
vnode = self._construct_flow_(pdef, datnodes=datnodes)
except VariableNotFoundError as err:
warn(
"{}. Skipping output variable {}.".format(str(err), vname),
DefinitionWarning,
)
else:
defnodes[vname] = vnode
return defnodes
def _construct_flow_(self, obj, datnodes={}):
if isinstance(obj, VarType):
vname = obj.key
if vname in self._ids.variables:
indices = numpy.index_exp[tuple(obj.ind)] if len(obj.ind) > 0 else ()
return ReadNode(self._ids.variables[vname], index=indices)
elif vname in datnodes:
return datnodes[vname]
else:
raise VariableNotFoundError(
"Input variable {!r} not found or cannot be used as input".format(
vname
)
)
elif isinstance(obj, OpType):
name = obj.key
nargs = len(obj.args)
op = find_operator(name, numargs=nargs)
args = [self._construct_flow_(arg, datnodes=datnodes) for arg in obj.args]
return EvalNode(name, op, *args)
elif isinstance(obj, FuncType):
name = obj.key
func = find_function(name)
args = [self._construct_flow_(arg, datnodes=datnodes) for arg in obj.args]
kwds = {
k: self._construct_flow_(obj.kwds[k], datnodes=datnodes)
for k in obj.kwds
}
return EvalNode(name, func, *args, **kwds)
else:
return obj
def _compute_node_infos_(self, nodes):
# Gather information about each FlowNode's metadata (via empty
# PhysArrays)
infos = {}
for name in nodes:
node = nodes[name]
try:
info = node[None]
except Exception as err:
ndef = self._ods.variables[name].definition
err_msg = "Failure to generate variable {!r} info with definition {!r}: {}".format(
name, ndef, str(err)
)
raise RuntimeError(err_msg)
else:
infos[name] = info
return infos
def _compute_dimension_maps_(self, definfos):
# Each output variable FlowNode must be mapped to its output dimensions.
# To aid with this, we sort by number of dimensions:
nodeorder = list(
zip(
*sorted(
(len(self._ods.variables[vname].dimensions), vname)
for vname in definfos
)
)
)[1]
# Now, we construct the dimension maps
i2omap = {}
o2imap = {}
for vname in nodeorder:
out_dims = tuple(self._ods.variables[vname].dimensions.keys())
inp_dims = definfos[vname].dimensions
unmapped_out = tuple(d for d in out_dims if d not in o2imap)
mapped_inp = tuple(o2imap[d] for d in out_dims if d in o2imap)
unmapped_inp = tuple(d for d in inp_dims if d not in mapped_inp)
if len(unmapped_out) != len(unmapped_inp):
map_str = ", ".join("{}-->{}".format(k, i2omap[k]) for k in i2omap)
err_msg = (
"Cannot map dimensions {} to dimensions {} in output variable {} "
"(MAP: {})"
).format(inp_dims, out_dims, vname, map_str)
raise ValueError(err_msg)
if len(unmapped_out) == 0:
continue
for out_dim, inp_dim in zip(unmapped_out, unmapped_inp):
o2imap[out_dim] = inp_dim
i2omap[inp_dim] = out_dim
# Now that we know how dimensions are mapped, compute the output
# dimension sizes
for dname, ddesc in self._ods.dimensions.items():
if dname in o2imap and o2imap[dname] in self._ids.dimensions:
idd = self._ids.dimensions[o2imap[dname]]
if (
ddesc.is_set() and ddesc.stringlen and ddesc.size < idd.size
) or not ddesc.is_set():
ddesc.set(idd)
return i2omap, o2imap
@property
def dimension_map(self):
"""The internally generated input-to-output dimension name map"""
return self._i2omap
def _create_map_nodes_(self, defnodes, definfos):
mapnodes = {}
for vname in defnodes:
dnode = defnodes[vname]
dinfo = definfos[vname]
map_dims = tuple(self._i2omap[d] for d in dinfo.dimensions)
name = "map({!s}, to={})".format(vname, map_dims)
mapnodes[vname] = MapNode(name, dnode, self._i2omap)
return mapnodes
def _create_validate_nodes_(self, datnodes, defnodes):
valid_vars = tuple(datnodes.keys()) + tuple(defnodes.keys())
valnodes = {}
for vname in valid_vars:
vdesc = self._ods.variables[vname]
vnode = datnodes[vname] if vname in datnodes else defnodes[vname]
try:
validnode = ValidateNode(vdesc, vnode)
except Exception as err:
vdef = vdesc.definition
err_msg = "Failure in variable {!r} with definition {!r}: {}".format(
vname, vdef, str(err)
)
raise RuntimeError(err_msg)
valnodes[vname] = validnode
return valnodes
def _find_sumlike_dimensions_(self):
unmapped_sumlike_dimensions = set()
for vname in self._valnodes:
vnode = self._valnodes[vname]
for nd in iter_dfs(vnode):
if isinstance(nd, EvalNode):
unmapped_sumlike_dimensions.update(nd.sumlike_dimensions)
# Map the sum-like dimensions to output dimensions
return set(
self._i2omap[d] for d in unmapped_sumlike_dimensions if d in self._i2omap
)
def _create_write_nodes_(self):
writenodes = {}
for fname in self._ods.files:
fdesc = self._ods.files[fname]
vmissing = tuple(
vname for vname in fdesc.variables if vname not in self._valnodes
)
if vmissing:
warn(
"Skipping output file {} due to missing required variables: "
"{}".format(fname, ", ".join(sorted(vmissing))),
DefinitionWarning,
)
else:
vnodes = tuple(self._valnodes[vname] for vname in fdesc.variables)
wnode = WriteNode(fdesc, inputs=vnodes)
writenodes[wnode.label] = wnode
return writenodes
def _compute_variable_sizes_(self):
bytesizes = {}
for vname in self._valnodes:
vdesc = self._ods.variables[vname]
vsize = sum(ddesc.size for ddesc in vdesc.dimensions.values())
vsize = 1 if vsize == 0 else vsize
bytesizes[vname] = vsize * vdesc.dtype.itemsize
return bytesizes
def _compute_file_sizes(self, varsizes):
filesizes = {}
for fname, wnode in self._writenodes.items():
filesizes[fname] = sum(varsizes[vnode.label] for vnode in wnode.inputs)
return filesizes
[docs] def execute(
self,
chunks={},
serial=False,
history=False,
scomm=None,
deflate=None,
debug=False,
):
"""
Execute the Data Flow
Parameters:
chunks (dict): A dictionary of output dimension names and chunk sizes for each
dimension given. Output dimensions not included in the dictionary will not be
chunked. (Use OrderedDict to preserve order of dimensions, where the first
dimension will be assumed to correspond to the fastest-varying index and the last
dimension will be assumed to correspond to the slowest-varying index.)
serial (bool): Whether to run in serial (True) or parallel (False)
history (bool): Whether to write a history attribute generated during execution
for each variable in the file
scomm (SimpleComm): An externally created SimpleComm object to use for managing
parallel operation
deflate (int): Override all output file deflate levels with given value
debug (bool): Whether to enable some rudimentary debugging features
"""
# Check chunks type
if not isinstance(chunks, dict):
raise TypeError("Chunks must be specified with a dictionary")
# Make sure that the specified chunking dimensions are valid
for odname, odsize in chunks.items():
if odname not in self._o2imap:
raise ValueError(
"Cannot chunk over unknown output dimension {!r}".format(odname)
)
if not isinstance(odsize, int):
raise TypeError(
("Chunk size invalid for output dimension {!r}: " "{}").format(
odname, odsize
)
)
# Check that we are not chunking over any "sum-like" dimensions
sumlike_chunk_dims = sorted(d for d in chunks if d in self._sumlike_dimensions)
if len(sumlike_chunk_dims) > 0:
if debug:
for d in sumlike_chunk_dims:
chunks.pop(d)
else:
raise ValueError(
'Cannot chunk over dimensions that are summed over (or "sum-like")'
": {}".format(", ".join(sumlike_chunk_dims))
)
# Create the simple communicator, if necessary
if scomm is None:
scomm = create_comm(serial=bool(serial))
elif isinstance(scomm, SimpleComm):
if scomm.is_manager():
print(
"Inheriting SimpleComm object from parent. (Ignoring serial argument.)"
)
else:
raise TypeError("Communication object is not a SimpleComm!")
# Start general output
prefix = "[{}/{}]".format(scomm.get_rank(), scomm.get_size())
if scomm.is_manager():
print("Beginning execution of data flow...")
print("Mapping Input Dimensions to Output Dimensions:")
for d in sorted(self._i2omap):
print(" {} --> {}".format(d, self._i2omap[d]))
if len(chunks) > 0:
print("Chunking over Output Dimensions:")
for d in chunks:
print(" {}: {}".format(d, chunks[d]))
else:
print("Not chunking output.")
# Partition the output files/variables over available parallel (MPI)
# ranks
fnames = scomm.partition(
list(self._filesizes.items()), func=WeightBalanced(), involved=True
)
if scomm.is_manager():
print(
"Writing {} files across {} MPI processes.".format(
len(self._filesizes), scomm.get_size()
)
)
scomm.sync()
# Standard output
print("{}: Writing {} files: {}".format(prefix, len(fnames), ", ".join(fnames)))
scomm.sync()
# Loop over output files and write using given chunking
for fname in fnames:
print("{}: Writing file: {}".format(prefix, fname))
if history:
self._writenodes[fname].enable_history()
else:
self._writenodes[fname].disable_history()
self._writenodes[fname].execute(chunks=chunks, deflate=deflate)
print("{}: Finished writing file: {}".format(prefix, fname))
scomm.sync()
if scomm.is_manager():
print("All output variables written.")
print()