Source code for cdms2.cdmsNode

# Automatically adapted for numpy.oldnumeric Aug 01, 2007 by
# Further modified to be pure new numpy June 24th 2008

"""
CDMS node classes
"""
import numpy
from numpy import get_printoptions, set_printoptions, inf
from . import CDML
import cdtime
import re
import string
import sys
from .error import CDMSError
from six import string_types

# Regular expressions
# Note: allows digit as first character
_Name = re.compile('[a-zA-Z0-9_:][-a-zA-Z0-9._:]*$')
_Integer = re.compile('[0-9]+$')
_ArraySep = re.compile('[\[\],\s]+')
# " illegal chars in content
_Illegal = re.compile('([<>&\"\'])|([^\t\r\n -\176\240-\377])')

# Data types

CdChar = CDML.CdChar
CdByte = CDML.CdByte
CdShort = CDML.CdShort
CdInt = CDML.CdInt
CdUInt = CDML.CdUInt
CdLong = CDML.CdLong
CdLongLong = CDML.CdLongLong
CdInt64 = CDML.CdInt64
CdFloat = CDML.CdFloat
CdDouble = CDML.CdDouble
CdString = CDML.CdString
CdUByte = CDML.CdUByte
CdUShort = CDML.CdUShort
CdUInt64 = CDML.CdUInt64
CdULong = CDML.CdULong
CdULongLong = CDML.CdULongLong

CdFromObject = CDML.CdFromObject
CdAny = CDML.CdAny
CdDatatypes = [
    CdChar,
    CdByte,
    CdShort,
    CdInt,
    CdUInt,
    CdLong,
    CdInt64,
    CdFloat,
    CdDouble,
    CdString,
    CdUByte,
    CdUShort,
    CdUInt64,
    CdULong,
    CdULongLong]

CdScalar = CDML.CdScalar
CdArray = CDML.CdArray

NumericToCdType = {numpy.dtype(numpy.single).char: CdFloat,
                   numpy.dtype(numpy.double).char: CdDouble,
                   numpy.dtype(numpy.short).char: CdShort,
                   numpy.dtype(numpy.intc).char: CdInt,
                   numpy.dtype(numpy.uintc).char: CdUInt,
                   numpy.dtype(numpy.int64).char: CdInt64,
                   numpy.dtype(numpy.byte).char: CdByte,
                   numpy.dtype(numpy.longlong).char: CdLongLong,
                   'c': CdChar,
                   numpy.dtype(numpy.ubyte).char: CdUByte,
                   numpy.dtype(numpy.ushort).char: CdUShort,
                   numpy.dtype(numpy.uint64).char: CdUInt64,
                   numpy.dtype(numpy.ulonglong).char: CdULongLong,
                   'S': CdString
                   }

CdToNumericType = {CdChar: 'c',
                   CdByte: numpy.byte,
                   CdUByte: numpy.ubyte,
                   CdShort: numpy.short,
                   CdUShort: numpy.ushort,
                   CdInt: numpy.intc,
                   CdUInt: numpy.uintc,
                   CdLong: numpy.int_,
                   CdInt64: numpy.int64,
                   CdUInt64: numpy.uint64,
                   CdULongLong: numpy.ulonglong,
                   CdFloat: numpy.single,
                   CdDouble: numpy.double,
                   CdString: numpy.str_}

# Grid types
UnknownGridType = "unknown"
GaussianGridType = "gaussian"
UniformGridType = "uniform"
CdGridtypes = [UnknownGridType, GaussianGridType, UniformGridType]

DuplicateIdError = "Duplicate identifier: "
InvalidArgumentError = "Invalid argument: "
InvalidDatatype = "Invalid datatype: "
InvalidGridtype = "Invalid grid type: "
InvalidIdError = "Invalid identifier: "
NotMonotonic = "Result array is not monotonic "


[docs]class NotMonotonicError(CDMSError): pass
# Array representation CdVector = 1 CdLinear = 2 # Monotonicity CdNotMonotonic = 0 CdIncreasing = -1 CdDecreasing = 1 CdSingleton = 2 # Map illegal XML characters to entity references: # '<' --> &lt; # '>' --> &gt; # '&' --> &amp; # '"' --> &quot; # "'" --> &apos; # all other illegal characters are removed #"
[docs]def mapIllegalToEntity(matchobj): s = matchobj.group(0) if s == '<': return '&lt;' elif s == '>': return '&gt;' elif s == '&': return '&amp;' elif s == '"': # " return '&quot;' elif s == "'": return '&apos;' else: return ""
# Named node
[docs]class CdmsNode: def __init__(self, tag, id=None, parent=None): if id and _Name.match(id) is None: raise CDMSError(InvalidIdError + id) # External attributes, attribute[name]=(value,cdDatatype) self.attribute = {} self.child = [] # Children self.id = id # Identifier string self.parent = parent # Parent node in a tree, None for root self.tag = tag # XML tag string self.content = None # XML content string # CDML Document Type Definition for this tag self.dtd = CDML.CDML().dtd.get(self.tag) self.extra = CDML.CDML().extra.get(self.tag) # Extra datatype constraints CdmsNode.mapToExternal(self) # Don't call subclass mapToExternal! # Map to external attributes
[docs] def mapToExternal(self): if self.id is not None and _Name.match(self.id) is None: raise CDMSError(InvalidIdError + self.id) if self.id is not None: self.setExternalAttr('id', self.id)
# Set content from a string. The interpretation # of content is class-dependent
[docs] def setContentFromString(self, content): self.content = content
# Get content
[docs] def getContent(self): return self.content
# Add a child node
[docs] def add(self, child): if child is not None: self.child.append(child) child.parent = self return child
# Return a list of child nodes
[docs] def children(self): return self.child
# Get the child node at index k
[docs] def getChildAt(self, index): return self.child[index]
# Remove and return the child at index k
[docs] def removeChildAt(self, index): child = self.child[index] self.child = self.child[:index] + self.child[index + 1:] return child
# Get the number of children
[docs] def getChildCount(self): return len(self.child)
# Get the index of a node
[docs] def getIndex(self, node): index = -1 for i in range(len(self.child)): if node is self.child[i]: index = i break return index
# Get the parent node
[docs] def getParent(self): return self.parent
# True iff node is a leaf node
[docs] def isLeaf(self): return self.child == []
# Set an external attribute # 'attr' is an Attr object
[docs] def setExternalAttrFromAttr(self, attr): if attr.value is None: return self.attribute[attr.name] = (attr.value, attr.getDatatype())
# Get an external attribute, as an Attr instance
[docs] def getExternalAttrAsAttr(self, name): attrPair = self.attribute.get(name) if attrPair: (value, datatype) = attrPair attr = AttrNode(name, value) attr.datatype = datatype return attr else: return None
# Set an external attribute
[docs] def setExternalAttr(self, name, value, datatype=None): attr = AttrNode(name, value) attr.datatype = datatype self.setExternalAttrFromAttr(attr)
# Get an external attribute
[docs] def getExternalAttr(self, name): attrPair = self.attribute.get(name) if attrPair: (value, datatype) = attrPair return value else: return None
# Get a dictionary of external attributes, of form (value,datatype)
[docs] def getExternalDict(self): return self.attribute
# Set the external attribute dictionary. The input dictionary # is of the form {name:value,...} where value is a string.
[docs] def setExternalDict(self, dict): for key in list(dict.keys()): self.attribute[key] = (dict[key], CdString)
# Write to a file, with formatting. # tablevel is the start number of tabs
[docs] def write(self, fd=None, tablevel=0, format=1): if fd is None: fd = sys.stdout printLimit = get_printoptions()['threshold'] # Ensure that all Numeric array values will be printed set_printoptions(threshold=inf) if self.dtd: validAttrs = list(self.dtd.keys()) else: validAttrs = None if format: fd.write(tablevel * '\t') fd.write('<' + self.tag) if format: fd.write('\n') # Write valid attributes for attname in list(self.attribute.keys()): if (validAttrs and (attname in validAttrs)) or (not validAttrs): if format: fd.write((tablevel + 1) * '\t') (attval, datatype) = self.attribute[attname] # attvalstr = string.replace(str(attval),'"',"'") # Map " to ' attvalstr = _Illegal.sub( mapIllegalToEntity, str(attval)) # Map illegal chars to entities if format: fd.write(attname + '\t="' + attvalstr + '"') else: fd.write(' ' + attname + '="' + attvalstr + '"') if format: fd.write('\n') if format: fd.write((tablevel + 1) * '\t') fd.write('>') if format: fd.write('\n') # Write extra attributes for attname in list(self.attribute.keys()): if validAttrs and (attname not in validAttrs): (attval, datatype) = self.attribute[attname] attr = AttrNode(attname, attval) attr.datatype = datatype attr.mapToExternal() attr.write(fd, tablevel + 1, format) # Write content content = self.getContent() if content is not None: content = _Illegal.sub( mapIllegalToEntity, content) # Map illegal chars to entities if format: fd.write((tablevel + 1) * '\t') fd.write(content) if format: fd.write('\n') # Write children for node in self.child: node.write(fd, tablevel + 1, format) if format: fd.write((tablevel + 1) * '\t') fd.write('</' + self.tag + '>') if format: fd.write('\n') set_printoptions(threshold=printLimit) # Restore original
# Write to a file without formatting.
[docs] def write_raw(self, fd=None): if fd is None: fd = sys.stdout self.write(fd, 0, 0)
# Write an LDIF (LDAP interchange format) entry # parentdn is the parent LDAP distinguished name # userAttrs is a string or list of strings of form "attr: value" # A trailing newline is added iff format==1 # Note: unlike write, this does not write children as well
[docs] def write_ldif(self, parentdn, userAttrs=[], fd=None, format=1): if fd is None: fd = sys.stdout if self.dtd: validAttrs = list(self.dtd.keys()) else: validAttrs = None # Write distinguished name newdn = "%s=%s,%s" % (self.tag, self.id, parentdn) fd.write("dn: %s\n" % newdn) # Write valid attributes for attname in list(self.attribute.keys()): if (validAttrs and (attname in validAttrs)) or (not validAttrs): (attval, datatype) = self.attribute[attname] # attvalstr = _Illegal.sub(mapIllegalToEntity,str(attval)) # # Map illegal chars to entities if isinstance(attval, string_types): attval = repr(attval) attvalstr = string.strip(attval) # Make sure continuation lines are preceded with a space attvalstr = re.sub('\n', '\n ', attvalstr) if attvalstr == '': attvalstr = "none" fd.write("%s: %s\n" % (attname, attvalstr)) # Write extra attributes for attname in list(self.attribute.keys()): if validAttrs and (attname not in validAttrs): (attval, datatype) = self.attribute[attname] if isinstance(attval, string_types): attval = repr(attval) # Make sure continuation lines are preceded with a space attval = re.sub('\n', '\n ', attval) fd.write("attr: %s=%s\n" % (attname, attval)) # Write content # content = self.getContent() # if content is not None: # content = _Illegal.sub(mapIllegalToEntity,content) # Map illegal chars to entities # fd.write("value: %s"%(content,)) # Write user attributes if isinstance(userAttrs, string_types): newAttrs = [userAttrs] else: newAttrs = userAttrs for entry in list(newAttrs): fd.write("%s\n" % entry) # Write classes fd.write("objectclass: top\n") fd.write("objectclass: %s\n" % (self.tag)) if format == 1: fd.write('\n') return newdn
# Validate attributes
[docs] def validate(self, idtable=None): # Check validity of enumerated values and references validKeys = list(self.dtd.keys()) for attname in list(self.attribute.keys()): if attname in validKeys: (atttype, default) = self.dtd[attname] if isinstance(atttype, tuple): attval = self.getExternalAttr(attname) assert attval in atttype, 'Invalid attribute %s=%s must be in %s' % ( attname, attval, repr(atttype)) elif atttype == CDML.Idref: attval = self.getExternalAttr(attname) if idtable: if attval not in idtable: print( 'Warning: ID reference not found: %s=%s' % (attname, attval)) # Validate children for node in self.children(): node.validate(idtable)
# Container object for other CDMS objects
[docs]class DatasetNode(CdmsNode): def __init__(self, id): CdmsNode.__init__(self, "dataset", id) self.idtable = {} # Validate the dataset and all child nodes
[docs] def validate(self, idtable=None): if not idtable: idtable = self.idtable CdmsNode.validate(self, idtable)
# Add a child node with an ID
[docs] def addId(self, id, child): if id in self.idtable: raise CDMSError(DuplicateIdError + id) CdmsNode.add(self, child) self.idtable[id] = child return child
# Get a child node from its ID
[docs] def getChildNamed(self, id): return self.idtable.get(id)
# Get the ID table
[docs] def getIdDict(self): return self.idtable
# Dump to a CDML file. # path is the file to dump to, or None for standard output. # if format is true, write with tab, newline formatting
[docs] def dump(self, path=None, format=1): if path: try: fd = open(path, 'w') except IOError: raise IOError('%s: %s' % (sys.exc_info()[1], path)) else: fd = sys.stdout fd.write('<?xml version="1.0"?>') if format: fd.write('\n') fd.write( '<!DOCTYPE dataset SYSTEM "http://www-pcmdi.llnl.gov/software/cdms/cdml.dtd">') if format: fd.write('\n') self.write(fd, 0, format) if fd != sys.stdout: fd.close()
# Spatio-temporal variable # Two ways to create a variable: # (1) var = VariableNode(id,datatype,domain) # (2) var = VariableNode(id,datatype) # var.setDomain(domain)
[docs]class VariableNode(CdmsNode): # Create a variable. # If validate is true, validate immediately def __init__(self, id, datatype, domain): assert isinstance( datatype, string_types), 'Invalid datatype: ' + repr(datatype) assert datatype in CdDatatypes, 'Invalid datatype: ' + repr(datatype) assert datatype in CdDatatypes, 'Invalid datatype: ' + repr(datatype) CdmsNode.__init__(self, "variable", id) self.datatype = datatype self.setDomain(domain) VariableNode.mapToExternal(self) # Set the domain
[docs] def setDomain(self, domain): if not self.isLeaf(): self.removeChildAt(0) self.add(domain)
# Get the domain
[docs] def getDomain(self): if self.getChildCount() > 0: return self.getChildAt(0) else: return None
# Map to external attributes
[docs] def mapToExternal(self): self.setExternalAttr('datatype', self.datatype)
# Coordinate axis
[docs]class AxisNode(CdmsNode): # If datatype is None, assume values [0,1,..,length-1] # data is a numpy array, if specified def __init__(self, id, length, datatype=CdLong, data=None): assert isinstance(length, int), 'Invalid length: ' + repr(length) assert isinstance( datatype, string_types), 'Invalid datatype: ' + repr(datatype) assert datatype in CdDatatypes, 'Invalid datatype: ' + repr(datatype) if data is not None: assert isinstance( data, numpy.ndarray), 'data must be a 1-D Numeric array' CdmsNode.__init__(self, "axis", id) self.datatype = datatype self.data = data # data representation is CdLinear or CdVector # If vector, self.data is a numpy array # and the content is the array string representation # If linear, the linear node is a child node # and the content is empty self.dataRepresent = None # An array of integer indices, shape (2,self.length) if defined self.partition = None # Actual number of data values, for a linear, partitioned axis self.partition_length = 0 if data is not None: self.setData(data) else: self.length = length AxisNode.mapToExternal(self) # Map to external attributes
[docs] def mapToExternal(self): self.setExternalAttr('datatype', self.datatype) self.setExternalAttr('length', self.length)
# Set data from content string # The content of an axis is the data array.
[docs] def setContentFromString(self, datastring): datatype = self.datatype numericType = CdToNumericType.get(datatype) if numericType is None: raise CDMSError(InvalidDatatype + datatype) stringlist = _ArraySep.split(datastring) numlist = [] for numstring in stringlist: if numstring == '': continue numlist.append(float(numstring)) if len(numlist) > 0: # NB! len(zero-length array) causes IndexError on Linux! dataArray = numpy.array(numlist, numericType) self.data = dataArray self.length = len(self.data)
# Set the partition from a string. This does not # set the external string representation
[docs] def setPartitionFromString(self, partstring): stringlist = _ArraySep.split(partstring) numlist = [] for numstring in stringlist: if numstring == '': continue numlist.append(int(numstring)) dataArray = numpy.array(numlist, numpy.int) if len(dataArray) > 0: self.partition = dataArray
# Get the content string: the data values if the representation # is as a vector, or ane empty string otherwise
[docs] def getContent(self): if self.data is None or self.dataRepresent == CdLinear: return '' else: return str(self.data)
# Set the data as an array, check for monotonicity
[docs] def setData(self, data): # If this axis is currently linear, remove the linear node if self.dataRepresent == CdLinear: index = self.getIndex(self.data) self.removeChildAt(index) self.data = data self.dataRepresent = CdVector self.length = len(data) self.setExternalAttr('length', self.length) if self.monotonicity() == CdNotMonotonic: raise NotMonotonicError(NotMonotonic)
# Get the data as an array
[docs] def getData(self): if self.dataRepresent == CdLinear: return self.data.toVector(self.datatype) else: return self.data
# Set the data as a linear vector # If the partition is set, derive the vector length from it
[docs] def setLinearData(self, linearNode, partition=None): self.data = linearNode if self.getChildCount() > 0: self.removeChildAt(0) # Remove the previous linear node self.add(linearNode) self.dataRepresent = CdLinear # self.length = linearNode.getExternalAttr('length') if partition is None: self.length = linearNode.length else: self.partition = partition self.length = partition[-1] linearNode.length = self.length self.setExternalAttr('partition', str(self.partition)) self.setExternalAttr('length', self.length)
# Test if axis data vectors are equal
[docs] def equal(self, axis): # Require that partitions (if any) are equal if self.partition is not None and axis.partition is not None: if len(self.partition) != len(axis.partition): return 0 if not numpy.alltrue(numpy.equal(self.partition, axis.partition)): return 0 elif self.partition is not None or axis.partition is not None: return 0 if self.dataRepresent == axis.dataRepresent == CdVector: try: return numpy.alltrue(numpy.equal(self.data, axis.data)) except ValueError: return 0 elif self.dataRepresent == axis.dataRepresent == CdLinear: return self.data.equal(axis.data) elif self.dataRepresent == CdVector: return axis.data.equalVector(self.data) else: return self.data.equalVector(axis.data)
# Test if axis data vectors are element-wise close # True iff for each respective element a and b, abs((b-a)/b)<=eps
[docs] def isClose(self, axis, eps): if eps == 0: return self.equal(axis) if self.dataRepresent == axis.dataRepresent == CdVector: try: return numpy.alltrue(numpy.less_equal(numpy.absolute( self.data - axis.data), numpy.absolute(eps * self.data))) except ValueError: return 0 elif self.dataRepresent == axis.dataRepresent == CdLinear: return self.data.isClose(axis.data, eps) elif self.dataRepresent == CdVector: return axis.data.isCloseVector(self.data, eps) else: return self.data.isCloseVector(axis.data, eps)
# Test for strict monotonicity. # Returns CdNotMonotonic, CdIncreasing, CdDecreasing, or CdSingleton
[docs] def monotonicity(self): if self.dataRepresent == CdLinear: return self.data.monotonicity() elif self.length == 1: return CdSingleton else: first = self.data[:-1] second = self.data[1:] if numpy.alltrue(numpy.less(first, second)): return CdIncreasing elif numpy.alltrue(numpy.greater(first, second)): return CdDecreasing else: return CdNotMonotonic
# Extend axes. 'isreltime' is true iff # the axes are relative time axes # If allowgaps is true, allow gaps when extending linear vectors
[docs] def extend(self, axis, isreltime=0, allowgaps=0): # Set trylin true if should try to catenate linear vectors if self.dataRepresent == CdLinear: anode = self.data if axis.dataRepresent == CdLinear: bnode = axis.data trylin = 1 elif axis.length == 1: bnode = LinearDataNode(axis.data[0], 0.0, 1) trylin = 1 else: trylin = 0 elif self.length == 1: anode = LinearDataNode(self.data[0], 0.0, 1) if axis.dataRepresent == CdLinear: bnode = axis.data trylin = 1 elif axis.length == 1: bnode = LinearDataNode(axis.data[0], 0.0, 1) trylin = 1 else: trylin = 0 else: trylin = 0 if isreltime == 1: units1 = self.getExternalAttr('units') units2 = axis.getExternalAttr('units') else: units1 = units2 = None if trylin == 1: try: aindex = 0 alength = anode.length bindex = alength blength = bnode.length if isreltime == 1 and units1 and units2 and units1 != units2: rtime = cdtime.reltime(bnode.start, units2) offset = rtime.torel(units1).value bnode.start = bnode.start + offset else: offset = None linNode = anode.concatenate(bnode, allowgaps) except NotMonotonicError: # The dimensions cannot be extended as linear arrays, # so try to extend them as vectors pass else: # Extend the partition attribute if offset is not None: bindex = int(offset / linNode.delta + 0.5) if self.partition is None: partition = numpy.array( [aindex, aindex + alength, bindex, bindex + blength]) self.partition_length = alength + blength else: partition = numpy.concatenate( (self.partition, [bindex, bindex + blength])) self.partition_length = self.partition_length + blength self.setLinearData(linNode, partition) self.setExternalAttr('partition_length', self.partition_length) return self # Else get both axis vectors, concatenate # and check that the result is monotonic ar1 = self.getData() ar2 = axis.getData() aindex = 0 alength = len(ar1) bindex = alength blength = len(ar2) # Adjust array2 if relative time and units differ if isreltime == 1: if units1 and units2 and units1 != units2: rtime = cdtime.reltime(0.0, units2) delta = rtime.torel(units1).value ar2 = ar2 + delta ar = numpy.concatenate((ar1, ar2)) try: self.setData(ar) except NotMonotonicError: # Restore original array and resignal self.setData(ar1) raise NotMonotonicError(NotMonotonic + repr(ar)) # Extend the partition attribute if self.partition is None: self.partition = numpy.array( [aindex, aindex + alength, bindex, bindex + blength]) self.partition_length = alength + blength else: self.partition = numpy.concatenate( (self.partition, [bindex, bindex + blength])) self.partition_length = self.partition_length + blength self.setExternalAttr('partition', str(self.partition)) self.setExternalAttr('partition_length', self.partition_length) return self
def __len__(self): return len(self.data)
# Linear data element
[docs]class LinearDataNode(CdmsNode): validStartTypes = [ int, float, type( cdtime.comptime(0)), type( cdtime.reltime( 0, "hours"))] validDeltaTypes = [int, float, list] def __init__(self, start, delta, length): assert isinstance(start, numpy.floating) or isinstance(start, numpy.integer) or ( type(start) in self.validStartTypes), 'Invalid start argument: ' + repr(start) assert isinstance(start, numpy.floating) or isinstance(start, numpy.integer) or ( type(delta) in self.validDeltaTypes), 'Invalid delta argument: ' + repr(delta) assert isinstance( length, int), 'Invalid length argument: ' + repr(length) CdmsNode.__init__(self, "linear") self.delta = delta self.length = length self.start = start LinearDataNode.mapToExternal(self) # Get an indexed value def __getitem__(self, index): return self.start + index * self.delta # Map to external attributes
[docs] def mapToExternal(self): self.setExternalAttr("start", self.start) self.setExternalAttr("delta", self.delta) self.setExternalAttr("length", self.length)
# Equality of linear vectors
[docs] def equal(self, axis): return self.delta == axis.delta and self.length == axis.length and self.start == axis.start
# Closeness of linear vectors
[docs] def isClose(self, axis, eps): if eps == 0: return self.equal(axis) else: return self.delta == axis.delta and self.length == axis.length and abs( self.start - axis.start) <= abs(eps * self.start)
# Equality of linear vector and array
[docs] def equalVector(self, ar): diff = ar[1:] - ar[:-1] try: comp = numpy.alltrue( numpy.equal( (self.delta) * numpy.ones( self.length - 1), diff)) except ValueError: return 0 return comp
# Closeness of linear vector and array
[docs] def isCloseVector(self, ar, eps): if eps == 0: return self.equalVector(ar) diff = ar[1:] - ar[:-1] diff2 = self.delta * numpy.ones(self.length - 1) try: comp = numpy.alltrue( numpy.less_equal( numpy.absolute( diff2 - diff), numpy.absolute( eps * diff2))) except ValueError: return 0 return comp
# Return monotonicity: CdNotMonotonic, CdIncreasing, CdDecreasing, or # CdSingleton
[docs] def monotonicity(self): if self.length == 1: return CdSingleton elif self.delta > 0.0: return CdIncreasing elif self.delta < 0.0: return CdDecreasing else: return CdNotMonotonic
# Return a vector representation, given a CDMS datatype
[docs] def toVector(self, datatype): numericType = CdToNumericType.get(datatype) if numericType is None: raise CDMSError(InvalidDatatype + datatype) start = self.start delta = self.delta length = self.length if length > 1: stop = start + (length - 0.99) * delta if delta == 0.0: delta = 1.0 ar = numpy.arange(start, stop, delta, numericType) else: ar = numpy.array([start], numericType) return ar
# Concatenate linear arrays, preserving linearity # If allowgaps is set, don't require that the linear arrays be contiguous # Return a new linear node
[docs] def concatenate(self, linearNode, allowgaps=0): if self.length > 1 and linearNode.length > 1 and self.delta != linearNode.delta: raise NotMonotonicError(NotMonotonic + 'linear vector deltas do not match: %s,%s' % (repr(self.delta), repr(linearNode.delta))) if self.length > 1: delta = self.delta elif linearNode.length > 1: delta = linearNode.delta else: delta = linearNode.start - self.start if allowgaps == 0: if linearNode.start - self.start != self.length * delta: raise NotMonotonicError( NotMonotonic + 'linear vectors are not contiguous') length = self.length + linearNode.length return LinearDataNode(self.start, delta, length)
def __len__(self): return self.length
# Rectilinear lat-lon grid
[docs]class RectGridNode(CdmsNode): # Create a grid # All arguments are strings def __init__(self, id, latitude, longitude, gridtype=UnknownGridType, order="yx", mask=None): CdmsNode.__init__(self, "rectGrid", id) self.latitude = latitude self.longitude = longitude self.gridtype = gridtype self.mask = mask self.order = order RectGridNode.mapToExternal(self) # Map to external attributes
[docs] def mapToExternal(self): self.setExternalAttr('type', self.gridtype) self.setExternalAttr('latitude', self.latitude) self.setExternalAttr('longitude', self.longitude) self.setExternalAttr('order', self.order) if self.mask is not None: self.setExternalAttr('mask', self.mask)
# Link to an external element
[docs]class XLinkNode(CdmsNode): def __init__(self, id, uri, contentRole, content=''): CdmsNode.__init__(self, "xlink", id) self.uri = uri self.contentRole = contentRole self.content = content XLinkNode.mapToExternal(self) # Map to external attributes
[docs] def mapToExternal(self): self.setExternalAttr("href", self.uri, CdString) self.setExternalAttr("content-role", self.contentRole, CdString)
# Link to a document
[docs]class DocLinkNode(CdmsNode): def __init__(self, uri, content=''): CdmsNode.__init__(self, "doclink") self.uri = uri self.content = content DocLinkNode.mapToExternal(self) # Map to external attributes
[docs] def mapToExternal(self): self.setExternalAttr("href", self.uri, CdString)
# Domain
[docs]class DomainNode(CdmsNode): def __init__(self): CdmsNode.__init__(self, "domain")
# Domain element
[docs]class DomElemNode(CdmsNode): def __init__(self, name, start=None, length=None): CdmsNode.__init__(self, "domElem") self.name = name self.start = start self.length = length DomElemNode.mapToExternal(self) # Map to external attributes
[docs] def mapToExternal(self): self.setExternalAttr('name', self.name) if self.start is not None: self.setExternalAttr('start', self.start) if self.length is not None: self.setExternalAttr('length', self.length)
# Set the name
[docs] def setName(self, name): self.name = name self.setExternalAttr('name', self.name)
# Get the name
[docs] def getName(self): return self.name
# Write to a file, with formatting. # tablevel is the start number of tabs
[docs] def write(self, fd=None, tablevel=0, format=1): if fd is None: fd = sys.stdout if format: fd.write(tablevel * '\t') fd.write('<' + self.tag) for attname in list(self.attribute.keys()): (attval, datatype) = self.attribute[attname] # attvalstr = string.replace(str(attval),'"',"'") # Map " to ' attvalstr = _Illegal.sub( mapIllegalToEntity, str(attval)) # Map illegal chars to entities fd.write(' ' + attname + '="' + attvalstr + '"') fd.write('/>') if format: fd.write('\n')
# Attribute node - only used as a placeholder during parse and write # Attr nodes are not placed on the tree # # Two ways to create an Attr object: # (1) attr = AttrNode(name,value) # datatype = sometype # optionally, to override intrinsic type # (2) attr = AttrNode(name,None) # attr.setValueFromString(somestring,sometype)
[docs]class AttrNode(CdmsNode): def __init__(self, name, value=None): CdmsNode.__init__(self, "attr") if not (isinstance(value, int) or isinstance(value, numpy.integer) or isinstance(value, float) or isinstance(value, numpy.floating) or isinstance(value, string_types) or isinstance(value, type(None))): raise CDMSError('Invalid attribute type: ' + repr(value)) self.name = name self.value = value self.datatype = None # CDMS datatype, use getDatatype to retrieve self.content = '' # string content # Note: mapToExternal is not called at init time, must be called explicitly # if needed
[docs] def mapToExternal(self): self.attribute['name'] = (self.name, CdString) self.attribute['datatype'] = (self.getDatatype(), CdString) self.content = self.getValueAsString()
[docs] def getDatatype(self): if self.datatype: return self.datatype elif isinstance(self.value, string_types): return CdString elif isinstance(self.value, float) or isinstance(self.value, numpy.floating): return CdDouble elif isinstance(self.value, int) or isinstance(self.value, numpy.integer): return CdLong else: raise CDMSError('Invalid attribute type: ' + repr(self.value))
[docs] def getLength(self): return 1
# Map a string of a given datatype to a value # Returns ValueError if the conversion fails
[docs] def setValueFromString(self, valString, datatype): val = None if not isinstance(valString, string_types): raise CDMSError('input value is not a string') if datatype == CdString: val = valString elif datatype in (CdShort, CdInt, CdLong): try: val = int(valString) except ValueError: raise CDMSError('value is not an integer: ' + valString) elif datatype in (CdFloat, CdDouble): try: val = float(valString) except ValueError: raise CDMSError('value is not floating-point: ' + valString) self.value = val self.datatype = datatype return val
[docs] def getValueAsString(self): return str(self.value)
# Set content # This may be called multiple times, so append
[docs] def setContentFromString(self, content): self.content = self.content + content
# Write to a file, with formatting. # tablevel is the start number of tabs
[docs] def write(self, fd=None, tablevel=0, format=1): if fd is None: fd = sys.stdout if self.dtd: validAttrs = list(self.dtd.keys()) else: validAttrs = None if format: fd.write(tablevel * '\t') fd.write('<' + self.tag) # Write valid attributes for attname in list(self.attribute.keys()): if (validAttrs and (attname in validAttrs)) or (not validAttrs): (attval, datatype) = self.attribute[attname] # attvalstr = string.replace(str(attval),'"',"'") # Map " to ' attvalstr = _Illegal.sub( mapIllegalToEntity, str(attval)) # Map illegal chars to entities fd.write(' ' + attname + '="' + attvalstr + '"') fd.write('>') # Write content if self.content is not None: content = _Illegal.sub( mapIllegalToEntity, self.content) # Map illegal chars to entities fd.write(content) fd.write('</' + self.tag + '>') if format: fd.write('\n')
if __name__ == '__main__': # a = numpy.array([0.,4.,8.,12.,16.,20.,24.,28.]) # b = numpy.array([0.,4.,8.,12.,16.,20.,24.,28.]) # c = numpy.array([0.,4.,8.,12.,16.,20.,24.,28.,32.]) # aAxis = AxisNode('a',len(a),CdDouble,a) # bAxis = AxisNode('b',len(b),CdDouble,b) # cAxis = AxisNode('c',len(c),CdDouble,c) # d = LinearDataNode(0.,4.,8) # e = LinearDataNode(0.,4.,8) # f = LinearDataNode(0.,4.,9) # dAxis = AxisNode('d',8); dAxis.setLinearData(d) # eAxis = AxisNode('e',8); eAxis.setLinearData(e) # fAxis = AxisNode('f',9); fAxis.setLinearData(f) # print aAxis.equal(bAxis) # print aAxis.equal(cAxis) # print aAxis.equal(dAxis) # print aAxis.equal(fAxis) # print dAxis.equal(eAxis) # print dAxis.equal(fAxis) # g = numpy.array([0.,4.,8.,12.,16.,20.,24.,20.]) # h = numpy.array([ 28., 24., 20., 16., 12., 8., 4., 0.]) # j = LinearDataNode(0.,-4.,8) # k = numpy.array([4.]) # gAxis = AxisNode('g',len(g),CdDouble,g) # hAxis = AxisNode('h',len(h),CdDouble,h) # jAxis = AxisNode('j',len(j),CdDouble); jAxis.setLinearData(j) # kAxis = AxisNode('k',len(k),CdDouble,k) # print aAxis.monotonicity() # print hAxis.monotonicity() # print kAxis.monotonicity() # print gAxis.monotonicity() # print dAxis.monotonicity() # print jAxis.monotonicity() m = LinearDataNode(1, 2, 3) n = LinearDataNode(11, 2, 3) p = LinearDataNode(15, -4, 3) q = numpy.array([4., 2., 1.]) r = numpy.array([11., 9., 8.]) s = numpy.array([7.]) t = numpy.array([9.]) v = numpy.array([5.]) mAxis = AxisNode('m', len(m), CdLong) mAxis.setLinearData(m) nAxis = AxisNode('n', len(n), CdLong) nAxis.setLinearData(n) pAxis = AxisNode('p', len(p), CdLong) pAxis.setLinearData(p) qAxis = AxisNode('q', len(q), CdDouble, q) rAxis = AxisNode('r', len(r), CdDouble, r) sAxis = AxisNode('s', len(s), CdDouble, s) tAxis = AxisNode('t', len(t), CdDouble, t) vAxis = AxisNode('v', len(v), CdDouble, v) def printType(axis): if axis.dataRepresent == CdLinear: print('linear') else: print('vector') def testit(a, b): import copy x = copy.copy(a) print(x.extend(b).getData()) printType(x) # testit(mAxis,nAxis) # testit(mAxis,pAxis) # testit(qAxis,nAxis) # testit(qAxis,rAxis) # testit(mAxis,sAxis) # testit(mAxis,tAxis) # testit(sAxis,tAxis) # testit(mAxis,rAxis) # testit(vAxis,nAxis) # testit(sAxis,rAxis) # Errors: # testit(mAxis,nAxis)