Source code for pyconform.physarray

"""
Physical Array Class

This module contains the PhysArray class, which is the primary object passed along
edges of a Data Flow graph.  It is a subclass of the Numpy MaskedArray, and carries with
its data the units associated with the data, the dimensions associated with each axis of
the data.

Copyright 2017-2020, University Corporation for Atmospheric Research
LICENSE: See the LICENSE.rst file for details
"""

from operator import mul, truediv
from os import linesep

import numpy
from cf_units import Unit

from pyconform.indexing import align_index

_ALPHAS_ = "abcdefghijklmnopqrstuvwxyz"


[docs]class UnitsError(ValueError): """Exception indicating an error involving units of a PhysArray object"""
[docs]class DimensionsError(ValueError): """Exception indicating an error involving dimensions of a PhysArray object"""
[docs]def getdata(obj): """ Retrieve the ndarray data associated with an object """ return numpy.ma.getdata(obj)
[docs]def getmask(obj): """ Retrieve the mask associated with an object """ return numpy.ma.getmask(obj)
[docs]def getdtype(obj): """ Get the dtype associated with an object """ return numpy.asarray(obj).dtype
[docs]def ischartype(obj): """ Return whether the object is a string/character type """ return getdtype(obj).char in ("S", "U")
[docs]def getshape(obj): """ Get the shape associated with an object """ if isinstance(obj, PhysArray): return obj.shape elif ischartype(obj): return CharArray._chararray_(obj).shape else: return numpy.shape(obj)
[docs]def getname(obj): """ Retrieve the string name associated with an object """ if isinstance(obj, PhysArray): return obj.name else: return str(obj).replace(linesep, " ")
[docs]def getunits(obj): """ Retrieve the Unit associated with the object """ if isinstance(obj, PhysArray): return obj.units elif ischartype(obj): return Unit("no unit") else: return Unit(1)
[docs]def getdimensions(obj): """ Retrieve the dimensions-tuple associated with the object """ if isinstance(obj, PhysArray): return obj.dimensions else: return tuple(reversed(range(len(getshape(obj)))))
[docs]def getpositive(obj): """ Retrieve the positive attribute associated with the object """ if isinstance(obj, PhysArray): return obj.positive else: return None
[docs]class PhysArray(numpy.ma.MaskedArray): """ A PhysArray is an array of data with both units and dimensions The PhysArray is deried from the Numpy MaskedArray and is the basic object along the edges of a Data Flow graph. """ def __new__( cls, indata, name=None, units=None, dimensions=None, positive="", **kwds ): makwds = {k: kwds[k] for k in ["dtype"] if k in kwds} obj = numpy.ma.asarray(indata, **makwds).view(cls) if obj.dtype.char in ("S", "U"): return CharArray(indata, name=name, dimensions=dimensions) # Add the mask if specified if "mask" in kwds: obj.mask = kwds["mask"] # Store a name associated with the object if name is None: obj.name = getname(indata) else: obj.name = name # Store units of the data if units is None: obj.units = getunits(indata) else: obj.units = units # Store dimension names associated with each axis if dimensions is None: obj.dimensions = getdimensions(indata) else: obj.dimensions = dimensions # Set the positive direction for the data if positive == "": obj.positive = getpositive(indata) else: obj.positive = positive return obj def __repr__(self): datstr = super(PhysArray, self).__str__().replace(linesep, " ") posstr = ( "" if self.positive is None else ", positive={!r}".format(self.positive) ) return ( "{!s}(data={!s}, fill_value={!s}, units={!r}, name={!r}, dimensions=" "{!s}{})" ).format( self.__class__.__name__, datstr, self.fill_value, str(self.units), self.name, self.dimensions, posstr, ) @property def name(self): """String name for the data""" return self._optinfo["name"] @name.setter def name(self, nm): """String name for the data""" self._optinfo["name"] = nm def __str__(self): return "{}".format(self.name) @property def units(self): """Units of the data""" return self._optinfo["units"] @units.setter def units(self, u): """Units of the data""" self._optinfo["units"] = u if isinstance(u, Unit) else Unit(u) @staticmethod def _safe_convert_(obj, units1, units2): # Because netcdftime datetime conversion always returns an NDArray, even if the # original object is a subclass of NDArray, we have to wrap the convert function # to safely preserve the object type... sigh. u1 = units1 if isinstance(units1, Unit) else Unit(units1) u2 = units2 if isinstance(units2, Unit) else Unit(units2) if isinstance(obj, PhysArray): new_array = numpy.ma.MaskedArray( units1.convert(obj.data, units2), mask=obj.mask, dtype=obj.dtype ) u1_str = "{}".format(u1) + ( "|{}".format(u1.calendar) if u1.calendar else "" ) u2_str = "{}".format(u2) + ( "|{}".format(u2.calendar) if u2.calendar else "" ) new_name = "convert({}, from={}, to={})".format(obj.name, u1_str, u2_str) return PhysArray( new_array, name=new_name, units=u2, dimensions=obj.dimensions ) elif isinstance(obj, numpy.ma.MaskedArray): return numpy.ma.MaskedArray( u1.convert(obj.data, u2), mask=obj.mask, dtype=obj.dtype ) else: return u1.convert(obj, u2)
[docs] def convert(self, units): """ Return a new PhysArray with new units Parameters: units (Unit): The new units to which to convert the PhysArray """ uunit = units if isinstance(units, Unit) else Unit(units) if self.units == uunit: return self elif self.units.is_convertible(uunit): return PhysArray._safe_convert_(self, self.units, uunit) else: raise UnitsError( "Cannot convert units {!r} to {!r}".format(self.units, uunit) )
@property def dimensions(self): """Named dimensions of the data""" return self._optinfo["dimensions"] @dimensions.setter def dimensions(self, dims): """Named dimensions of the data""" if not isinstance(dims, (list, tuple)): raise TypeError("Dimensions must be a tuple, not {}".format(type(dims))) if len(dims) != len(self.shape): raise ValueError( "Dimensions {} must have same length as shape {}".format( dims, self.shape ) ) self._optinfo["dimensions"] = tuple(dims)
[docs] def transpose(self, *dims): """ Return a new PhysArray with dimensions transposed in the order given Does nothing if no transpose is necesary Parameters: dims (tuple): Tuple of dimension names in the new order """ if len(dims) == 1 and isinstance(dims[0], (list, tuple)): dims = tuple(dims[0]) if set(dims) == set(self.dimensions): new_dims = tuple(dims) axes = tuple(self.dimensions.index(d) for d in dims) elif set(dims) == set(range(self.ndim)): new_dims = tuple(self.dimensions[i] for i in dims) axes = dims else: raise DimensionsError( ("Cannot transpose dimensions/axes {} to {}").format( self.dimensions, dims ) ) if new_dims == self.dimensions: return self else: old_dims_str = ",".join([str(d) for d in self.dimensions]) new_dims_str = ",".join([str(d) for d in new_dims]) new_name = "transpose({}, from=[{}], to=[{}])".format( self.name, old_dims_str, new_dims_str ) return PhysArray( super(PhysArray, self).transpose(*axes), dimensions=new_dims, name=new_name, )
@property def positive(self): """Positive direction (up or down) for the data""" return self._optinfo["positive"] @positive.setter def positive(self, pos): """Positive direction (up or down) for the data""" if isinstance(pos, str): strpos = str(pos).lower() if strpos not in ["up", "down"]: raise ValueError( "Positive attribute must be up/down or None, not {!r}".format(pos) ) pos = strpos elif pos is not None: raise ValueError( "Positive attribute must be up/down or None, not {!r}".format(pos) ) self._optinfo["positive"] = pos
[docs] def flip(self): """ Flip the direction of the positive attribute, if set, and correspondingly multiply by -1 Does nothing if the positive attribute is not set (i.e., equals None) """ if self.positive is not None: nm = self.name self *= -1 self.positive = "up" if self.positive == "down" else "down" self.name = "{}({})".format(self.positive, nm) return self
[docs] def up(self): """ Set the direction of the positive attribute to 'up' and multiply by -1, if necessary Only multiplies by -1 if the positive attribute is already set to 'down'. """ if self.positive is None: self.positive = "up" self.name = "up({})".format(self.name) elif self.positive == "down": self.flip() return self
[docs] def down(self): """ Set the direction of the positive attribute to 'down' and multiply by -1, if necessary Only multiplies by -1 if the positive attribute is already set to 'up'. """ if self.positive is None: self.positive = "down" self.name = "down({})".format(self.name) elif self.positive == "up": self.flip() return self
def __getitem__(self, index): idx = align_index(index, self.dimensions) if len(idx) == 0: return self else: dimensions = tuple( d for i, d in zip(idx, self.dimensions) if isinstance(i, slice) ) if dimensions != self.dimensions: return PhysArray( super(PhysArray, self).__getitem__(idx), dimensions=dimensions ) else: return super(PhysArray, self).__getitem__(idx) def __setitem__(self, index, values): idx = align_index(index, self.dimensions) if isinstance(values, PhysArray): values = values.convert(self.units).transpose(self.dimensions) super(PhysArray, self).__setitem__(idx, values) def _broadcast_(self, other): for d in set(self.dimensions).intersection(set(other.dimensions)): if ( self.shape[self.dimensions.index(d)] != other.shape[other.dimensions.index(d)] ): raise DimensionsError( "Cannot broadcast arrays with dimensions {} and " "{}".format(self.dimensions, other.dimensions) ) self_dims = self.dimensions + tuple( d for d in other.dimensions if d not in self.dimensions ) self.shape = tuple( self.shape[self.dimensions.index(d)] if d in self.dimensions else 1 for d in self_dims ) other_dims = other.dimensions + tuple( d for d in self.dimensions if d not in other.dimensions ) other.shape = tuple( other.shape[other.dimensions.index(d)] if d in other.dimensions else 1 for d in other_dims ) if len(self.dimensions) > 0 and self_dims != self.dimensions: fromdims = ",".join([str(d) for d in self.dimensions]) todims = ",".join([str(d) for d in self_dims]) self.name = "broadcast({}, from=[{}], to=[{}])".format( self.name, fromdims, todims ) self.dimensions = self_dims if len(other.dimensions) > 0 and other_dims != other.dimensions: fromdims = ",".join([str(d) for d in other.dimensions]) todims = ",".join([str(d) for d in other_dims]) other.name = "broadcast({}, from=[{}], to=[{}])".format( other.name, fromdims, todims ) other.dimensions = other_dims return other.transpose(self_dims) def _match_positive_(self, other): if self.positive == other.positive: pass elif self.positive is None: if other.positive == "up": self.up() elif other.positive == "down": self.down() elif other.positive is None: if self.positive == "up": other.up() elif self.positive == "down": other.down() else: other.flip() def _check_inplace_(self, other): other = PhysArray(other) sdims = set(self.dimensions) odims = set(other.dimensions) if not odims.issubset(sdims): for d in odims - sdims: if other.shape[other.dimensions.index(d)] > 1: raise DimensionsError( ( "Cannot broadcast dimensions {} to {} in inplace operation" "" ).format(other.dimensions, self.dimensions) ) def _add_sub_init_(self, other): other = self._broadcast_(PhysArray(other)).convert(self.units) self._match_positive_(other) return other def __add__(self, other): result = PhysArray(self) other = result._add_sub_init_(other) return PhysArray( super(PhysArray, result).__add__(other), name="({}+{})".format(result.name, other.name), units=result.units, positive=result.positive, ) def __radd__(self, other): return PhysArray(other).__add__(self) def __iadd__(self, other): self._check_inplace_(other) other = self._add_sub_init_(other) super(PhysArray, self).__iadd__(other) self.name = "({}+{})".format(self.name, other.name) return self def __sub__(self, other): result = PhysArray(self) other = result._add_sub_init_(other) return PhysArray( super(PhysArray, result).__sub__(other), name="({}-{})".format(result.name, other.name), units=result.units, positive=result.positive, ) def __rsub__(self, other): return PhysArray(other).__sub__(self) def __isub__(self, other): self._check_inplace_(other) other = self._add_sub_init_(other) super(PhysArray, self).__isub__(other) self.name = "({}-{})".format(self.name, other.name) return self def _units_op_(self, val, op): sunits = self.units try: units = op(sunits, val) except: opnm = str(op.__name__) raise UnitsError( "Operator {!r} failed with units: {}, {}".format(opnm, sunits, val) ) return units def _mul_div_init_(self, other): other = self._broadcast_(PhysArray(other)) self._match_positive_(other) return other def __mul__(self, other): result = PhysArray(self) other = result._mul_div_init_(other) return PhysArray( super(PhysArray, result).__mul__(other), name="({}*{})".format(result.name, other.name), units=self._units_op_(other.units, mul), positive=result.positive, ) def __rmul__(self, other): return PhysArray(other).__mul__(self) def __imul__(self, other): self._check_inplace_(other) other = self._mul_div_init_(other) super(PhysArray, self).__imul__(other) self.name = "({}*{})".format(self.name, other.name) self.units = self._units_op_(other.units, mul) return self
[docs] def invert(self): """Return a new PhysArray with the value of the array inverted (1/value)""" return PhysArray( 1.0 / self, dimensions=self.dimensions, units=self.units.invert(), name="(1/{!s})".format(self), positive=self.positive, )
def __truediv__(self, other): result = PhysArray(self) other = result._mul_div_init_(other) return PhysArray( super(PhysArray, result).__truediv__(other), name="({}/{})".format(result.name, other.name), units=self._units_op_(other.units, truediv), positive=result.positive, ) def __rtruediv__(self, other): return PhysArray(other).__truediv__(self) def __itruediv__(self, other): self._check_inplace_(other) other = self._mul_div_init_(other) super(PhysArray, self).__itruediv__(other) self.name = "({}/{})".format(self.name, other.name) self.units = self._units_op_(other.units, truediv) return self def __floordiv__(self, other): result = PhysArray(self) other = result._mul_div_init_(other) units = self.units / other.units return PhysArray( super(PhysArray, result).__floordiv__(other), name="({}//{})".format(result.name, other.name), units=units, positive=result.positive, ) def __rfloordiv__(self, other): return PhysArray(other).__floordiv__(self) def __ifloordiv__(self, other): self._check_inplace_(other) other = self._mul_div_init_(other) super(PhysArray, self).__ifloordiv__(other) self.name = "({}//{})".format(self.name, other.name) self.units = self.units / other.units return self def __mod__(self, other): raise NotImplementedError("Modulus of a PhysArray is not defined.") def __rmod__(self, other): return PhysArray(other).__mod__(self) def __imod__(self, other): raise NotImplementedError("Modulus of a PhysArray is not defined.") def _check_exponent_(self, exp): if exp.dimensions != (): raise DimensionsError("Exponents must be scalar: {}".format(exp)) if exp.positive is not None: raise ValueError("Exponents cannot have positive attribute: {}".format(exp)) if not exp.units.is_convertible(Unit(1)): raise UnitsError("Exponents cannot have physical units") return exp.convert(Unit(1)) def __pow__(self, other): other = self._check_exponent_(PhysArray(other)) positive = None if other.data % 2 == 0 else self.positive return PhysArray( super(PhysArray, self).__pow__(other), units=self.units ** other, name="({!s}**{!s})".format(self, other), positive=positive, ) def __rpow__(self, other): return PhysArray(other).__pow__(self) def __ipow__(self, other): other = self._check_exponent_(PhysArray(other)) super(PhysArray, self).__ipow__(other) self.name = "({!s}**{!s})".format(self, other) self.units **= other self.positive = None if other.data % 2 == 0 else self.positive return self
[docs] def mean(self, dimensions=None, **kwds): if dimensions is None: axis = kwds["axis"] if "axis" in kwds else None elif isinstance(dimensions, (list, tuple)): axis = tuple( i for i in sorted(self.dimensions.index(d) for d in dimensions) ) else: raise TypeError("Dimensions must be given as a list or tuple") if axis is None: dims = self.dimensions meanval = self.view(numpy.ma.MaskedArray).mean() elif isinstance(axis, int): dims = (self.dimensions[axis],) meanval = self.view(numpy.ma.MaskedArray).mean(axis=axis) elif isinstance(axis, (list, tuple)): dims = tuple(self.dimensions[i] for i in axis) meanval = self.view(numpy.ma.MaskedArray) for a in axis: meanval = meanval.mean(axis=a) else: raise TypeError("Axis must be given as an integer, list or tuple") new_dims = tuple(d for d in self.dimensions if d not in dims) dim_str = ",".join(str(d) for d in dims) return PhysArray( meanval, name="mean({}, dims=[{}])".format(self.name, dim_str), dimensions=new_dims, positive=self.positive, units=self.units, )
[docs] def sum(self, dimensions=None, **kwds): if dimensions is None: axis = kwds["axis"] if "axis" in kwds else None elif isinstance(dimensions, (list, tuple)): axis = tuple( i for i in sorted(self.dimensions.index(d) for d in dimensions) ) else: raise TypeError("Dimensions must be given as a list or tuple") if axis is None: dims = self.dimensions sumval = self.view(numpy.ma.MaskedArray).sum() elif isinstance(axis, int): dims = (self.dimensions[axis],) sumval = self.view(numpy.ma.MaskedArray).sum(axis=axis) elif isinstance(axis, (list, tuple)): dims = tuple(self.dimensions[i] for i in axis) sumval = self.view(numpy.ma.MaskedArray) for a in axis: sumval = sumval.mean(axis=a) else: raise TypeError("Axis must be given as an integer, list or tuple") new_dims = tuple(d for d in self.dimensions if d not in dims) dim_str = ",".join(str(d) for d in dims) return PhysArray( sumval, name="sum({}, dims=[{}])".format(self.name, dim_str), dimensions=new_dims, positive=self.positive, units=self.units, )
[docs]class CharArray(PhysArray): """ Special kind of PhysArray to deal with string arrays """ def __new__(cls, indata, name=None, dimensions=None): obj = numpy.ma.asarray(indata, dtype="S") if len(obj.shape) == 0: obj = obj.reshape(1).view("S1") else: strlen = obj.dtype.itemsize shape = obj.shape + ((strlen,) if strlen > 1 else tuple()) obj = obj.view("S1").reshape(shape) obj = numpy.ma.masked_where(obj == b"", obj).view(cls) obj.fill_value = b"" # Store a name associated with the object if name is None: obj.name = getname(indata) else: obj.name = name # Store units of the data obj.units = Unit("no unit") # Store dimension names associated with each axis if dimensions is None: obj.dimensions = getdimensions(indata) else: obj.dimensions = dimensions # Set the positive direction for the data obj.positive = None return obj @staticmethod def _chararray_(indata): obj = CharArray._strarray_(indata) if len(obj.shape) == 0: obj = obj.reshape(1).view("S1") else: strlen = obj.dtype.itemsize shape = obj.shape + ((strlen,) if strlen > 1 else tuple()) obj = obj.view("S1").reshape(shape) return obj @staticmethod def _strarray_(indata): return numpy.asarray(indata, dtype="S") def __repr__(self): if self.shape[-1] > 0: prndat = self.data.view("S{}".format(self.shape[-1])).reshape( self.shape[:-1] ) else: prndat = self.data datstr = str(prndat).replace(linesep, " ") return ("{!s}(data={!s}, name={!r}, dimensions=" "{!s})").format( self.__class__.__name__, datstr, self.name, self.dimensions ) @property def units(self): """Units of the data""" return self._optinfo["units"] @units.setter def units(self, units): new_units = units if isinstance(units, Unit) else Unit(units) if not new_units.is_no_unit(): raise UnitsError("CharArrays cannot have units.") self._optinfo["units"] = new_units @property def positive(self): """Positive direction (up or down) for the data""" return self._optinfo["positive"] @positive.setter def positive(self, pos): if pos is not None: raise ValueError("CharArrays cannot be assigned a positive attribute") self._optinfo["positive"] = pos
[docs] def convert(self, units): try: new_self = PhysArray.convert(self, units) except UnitsError: raise UnitsError( "CharArrays do not have units and cannot be converted to units {}".format( units ) ) return new_self
[docs] def transpose(self, *dims): if set(dims) == set(self.dimensions) and dims[-1] != self.dimensions[-1]: raise DimensionsError( "The last dimension of a CharArray must always be the string length. " "Cannot transpose." ) return PhysArray.transpose(self, *dims)
[docs] def invert(self): raise NotImplementedError("CharArrays cannot be inverted")
[docs] def stretch(self, newlen): if newlen > self.shape[-1]: pad = numpy.zeros((self.shape[:-1] + (newlen - self.shape[-1],)), dtype="S") pad = numpy.ma.masked_where(pad == "", pad) return CharArray( numpy.ma.concatenate((self, pad), axis=-1), name=self.name, dimensions=self.dimensions, ) else: return self