Source code for metacsv.core.internals


from __future__ import absolute_import, division, print_function, \
    with_statement, unicode_literals

import pandas as pd
import numpy as np
import re
from collections import OrderedDict
from pandas.core.base import FrozenList

from .exceptions import GraphIsCyclicError
from .._compat import string_types, has_iterkeys, iterkeys, has_iteritems, iteritems
from ..io import to_xarray, to_csv, to_pandas


class _BaseProperty(object):
    property_type = None  # overload
    repr_order = []

    def __init__(self, data=None, container=None):
        if data is None:
            self._data = None
        elif isinstance(data, _BaseProperty):
            self._data = data._data
        else:
            if isinstance(data, dict) or isinstance(data, OrderedDict):
                self._data = data
            else:
                raise TypeError

    def __repr__(self):
        return str(self)

    def __str__(self):
        truncate = lambda s: '\n'.join([l if len(l) < 80 else l[:75] + '...' for l in s.split('\n')])
        if self._data is not None and len(self._data) > 0:
            repr_str = '' if len(self._data) == 0 else self.property_type
            for props, prop_data in self._data.items():
                repr_str += '\n    {: <15} {}'.format(
                    str(props) + ':', prop_data)
            return truncate(repr_str)
        else:
            return '<Empty {}>'.format(self.property_type)

    def __iter__(self):
        if self._data is not None:
            for k, v in self._data.items():
                yield k, v

    def pop(self, key, *default):
        if len(default) > 1:
            raise ValueError(
                'pop() takes exactly 2 arguments ({} given)'.format(len(default) + 1))

        if self._data is not None:
            if len(default) == 0:
                return self._data.pop(key)
            else:
                return self._data.pop(key, default[0])
                
        else:
            if len(default) == 1:
                return default[0]

            else:
                raise KeyError(
                    '{} not yet assigned.'.format(self.property_type))

    def get(self, key, *default):
        if len(default) > 1:
            raise ValueError(
                'get() takes exactly 2 arguments ({} given)'.format(len(default) + 1))

        if self._data is not None:
            if len(default) == 0:
                return self._data.get(key)
            else:
                return self._data.get(key, default[0])

        else:
            if len(default) == 1:
                return default[0]

            else:
                raise KeyError(
                    '{} not yet assigned.'.format(self.property_type))

    def update(self, value):
        if self._data == None:
            self._data = {}

        if isinstance(value, _BaseProperty):
            self._data.update(value._data)
        elif has_iterkeys(value):
            if len(value) > 0:
                self._data.update(value)
        else:
            raise TypeError('Passed value is not iterable')

    def __getitem__(self, key):
        if self._data is None:
            raise KeyError('{} not yet assigned.'.format(self.property_type))
        return self._data[key]

    def __setitem__(self, key, value):
        if self._data is None:
            self._data = {}

        if isinstance(value, _BaseProperty):
            self._data[key] = value._data
        else:
            self._data[key] = value

    def __delitem__(self, key):
        if self._data is None:
            raise KeyError('{} not yet assigned.'.format(self.property_type))
        del self._data[key]

    def __getattr__(self, key):
        if key in self.__dict__:
            return self.__dict__[key]
        if '_data' in self.__dict__:
            if self.__dict__['_data'] != None:
                if key in self.__dict__['_data']:
                    return self.__dict__['_data'][key]
        raise AttributeError("'{}' object has no attribute '{}'".format(self.property_type, key))

    def __eq__(self, other):
        if hasattr(other, '_data'):
            return dict(self._data) == dict(other._data)
        if other is None and (self._data is None or len(self._data) == 0):
            return True
        elif has_iteritems(other):
            return dict(self._data) == dict(other)
        return False

    def __ne__(self, other):
        return not self.__eq__(other)

    def __contains__(self, key):
        if self._data is None:
            return False
        return key in self._data

    def __len__(self):
        if self._data is None:
            return 0
        return len(self._data)

    def items(self):
        if self._data is not None:
            for k, v in self._data.items():
                yield (k, v)

    def iteritems(self):
      return self.items()

    def copy(self):
        if self._data is not None:
            return type(self)(self._data.copy(), container=None)
        else:
            return type(self)()


[docs]class Attributes(_BaseProperty): property_type = 'Attributes'
[docs]class Variables(_BaseProperty): property_type = 'Variables' @staticmethod
[docs] def parse_string_var(defn): if not isinstance(defn, string_types): raise TypeError('parse_string_var only accepts string arguments') pattern = re.search(r'^(?P<desc>[^\[]+)(\s+\[(?P<unit>.*)\])?$', defn) if not pattern: return defn vardata = {'description': pattern.group('desc')} unit = pattern.group('unit') if unit: vardata['unit'] = unit return vardata
def __str__(self): truncate = lambda s: '\n'.join([l if len(l) < 80 else l[:75] + '...' for l in s.split('\n')]) if self._data is not None and len(self._data) > 0: repr_str = '' if len(self._data) == 0 else self.property_type for props, prop_data in self._data.items(): item_str = '\n {: <10} {}'.format( str(props) + ':', (prop_data if not has_iteritems(prop_data) else '\n' + '\n'.join([' '*8 + '{: <15} {}'.format(k, v) for k, v in iteritems(prop_data)]))) repr_str += item_str return truncate(repr_str) else: return '<Empty {}>'.format(self.property_type)
[docs]class Coordinates(object): ''' Manages coordinate system for MetaCSV data containers ''' property_type = 'Coordinates' def __init__(self, coords=None, container=None): if container is not None: if not isinstance(container, (Container, pd.DataFrame, pd.Series)): if isinstance(container, pd.Panel): raise NotImplementedError('Coordinates not implemented for panel data') raise TypeError( '__init__ container argument must be a metacsv or pandas DataFrame or Series') self._container = container if hasattr(coords, 'copy'): coords = coords.copy() self.__set__(coords) def __set__(self, coords): self._coords = None self._base_coords = None self._base_dependencies = None if isinstance(coords, Coordinates) and (coords._coords is None or (len(coords._coords) == 0)): return elif coords is None or (len(coords) == 0): return elif isinstance(coords, Coordinates): _coords = coords._coords _base_coords = coords._base_coords _base_dependencies = coords._base_dependencies else: _coords, _base_coords, _base_dependencies = self.parse_coords_definition( coords) self._send_coords_in_cols_to_index(_coords) self._validate_coords_against_data(coords=_coords) self._coords = _coords self._base_coords = _base_coords self._base_dependencies = _base_dependencies def __repr__(self): coords_str = 'Coordinates' if self._coords is not None: for base in self._base_coords: coords_str += '\n' + self._repr_coord(base, base=True) for coord in [c for c in self._coords if not c in self._base_coords]: coords_str += '\n' + self._repr_coord(coord, base=False) return coords_str else: return '<Empty {}>'.format(self.property_type) def __iter__(self): if self._coords is not None: for k in self._coords.keys(): yield k # TODO: # ensure compatability with PY3 and # pd._compat utilities
[docs] def items(self): if self._coords is not None: for k, v in self._coords.items(): yield (k, v)
[docs] def iteritems(self): for k, v in self.items(): yield k, v
def __eq__(self, other): if isinstance(other, Coordinates): return ((dict(self._coords) == dict(other._coords)) and (self._base_coords == other._base_coords)) elif (other is None) and (self._coords is None): return True elif has_iteritems(other): _coords, _base_coords, _deps = self.parse_coords_definition(other) return ((dict(self._coords) == _coords) and (self._base_coords == _base_coords)) return False def __ne__(self, other): return not self.__eq__(other) def __getitem__(self, key): if self._coords is None: raise KeyError('Coordinate not yet defined') return self._coords[key] def __len__(self): if self._coords is None: return 0 return len(self._coords) def __lenth_hint__(self): if self._coords is None: return 0 if hasattr(self._coords, '__length_hint__'): return self._coords.__lenth_hint__() return self.__len__() def _repr_coord(self, coord, base=False, maxlen=50): if self._container is None: datastr = '' else: datastr = '' if isinstance(self._container.index, pd.MultiIndex): coord_data = self._container.index.levels[ self._container.index.names.index(coord)] else: coord_data = self._container.index.values datastr += ' {} '.format(coord_data.dtype) for i, ind in enumerate(coord_data): if len(datastr) + len(str(ind)) + 5 > maxlen: datastr += '...' break if i > 0: datastr += ', ' datastr += '{}'.format(ind) coordstr = (' * ' if base else ' ') coordstr += ('{: <10}'.format(coord)) coordstr += (' ({})'.format(coord if base else ','.join( list(map(str, self._coords[coord]))))) coordstr += datastr return coordstr
[docs] def copy(self): if self._coords is None: return type(self)() return type(self)(self._coords.copy(), container=None)
@property def base_coords(self): return self._base_coords @staticmethod
[docs] def parse_coords_definition(coords=None): ''' Validate coords to test for cyclic graph ''' if coords == None: return None, None, None if isinstance(coords, string_types): return OrderedDict([(coords, None)]), FrozenList([coords]), {coords: set([coords])} elif not has_iterkeys(coords): if isinstance(coords, Coordinates): coords = coords._coords coords = OrderedDict( list(zip(list(coords), [None for _ in range(len(coords))]))) return coords, FrozenList(coords.keys()), {c: set([c]) for c in coords.keys()} base_coords = [] dependencies = OrderedDict([]) base_deps = {} visited = set() def find_coord_dependencies(coord): if coord in visited: if coord not in dependencies: raise GraphIsCyclicError return deps = coords.pop(coord) if deps is None: base_coords.append(coord) dependencies[coord] = None base_deps[coord] = set([coord]) visited.add(coord) elif isinstance(deps, string_types): visited.add(coord) find_coord_dependencies(deps) dependencies[coord] = set([deps]) base_deps[coord] = base_deps[deps] else: visited.add(coord) dependencies[coord] = set() base_deps[coord] = set() for ele in deps: find_coord_dependencies(ele) dependencies[coord].add(ele) base_deps[coord] |= base_deps[ele] while len(coords) > 0: find_coord_dependencies(next(iterkeys(coords))) # Convert from sets to lists for k, v in dependencies.items(): if v is not None: dependencies[k] = list(v) return dependencies, FrozenList(base_coords), base_deps
def _get_coords_from_data(self): if not pd.isnull(self._container.index.names).any(): coords, base_coords, base_dependencies = self.parse_coords_definition( self._container.index.names) elif len(self._container.index.names) == 1 and self._container.index.names[0] is None: self._container.index.names = ['index'] coords, base_coords, base_dependencies = self.parse_coords_definition( self._container.index.names) elif pd.isnull(self._container.index.names).any(): self._container.index.names = [coord if coord is not None else 'level_{}'.format( i) for i, coord in enumerate(self._container.index.names)] coords, base_coords, base_dependencies = self.parse_coords_definition( self._container.index.names) return coords, base_coords, base_dependencies
[docs] def set_coords_from_data(self): self._coords, self._base_coords, self._base_dependencies = self._get_coords_from_data()
[docs] def update(self, coords=None): # This needs some testing!! if coords is None: coords = self._coords if coords is None: if self._container is None: raise ValueError( 'Cannot update coordinates from data unless assigned to a container') coords, base_coords, base_dependencies = self._get_coords_from_data() self._prune() if (not hasattr(self, '_coords')) or self._coords is None: _coords = OrderedDict() else: _coords = self._coords.copy() orig_coords = _coords for k, v in coords.items(): orig_coords[k] = v self.__set__(orig_coords)
def _send_coords_in_cols_to_index(self, coords=None, container=None): coords = coords if coords is not None else self._coords if coords is None: return container = container if container is not None else self._container if self._container is None: return if hasattr(container, 'columns') and hasattr(container, 'set_index'): if len(container.index.names) == 1 and (container.index.names[0] is None): append = False else: append = True set_coords = [c for c in coords if ( c not in container.index.names) and (c in container.columns)] if len(set_coords) > 0: container.set_index(set_coords, inplace=True, append=append) @staticmethod def _get_available_coords(container): available_coords = [] for dim in ['index', 'columns']: if hasattr(container, dim): available_coords.extend( [i for i in container.__getattr__(dim).names if i is not None]) return available_coords def _prune(self, coords=None, container=None): coords = coords if coords is not None else self._coords if coords is None: return container = container if container is not None else self._container if container is None: return available_coords = self._get_available_coords(container) for c in coords: if c not in available_coords: coords.pop(c) return coords def _validate_coords_against_data(self, coords=None, container=None): if coords is None: return container = container if container is not None else self._container if container is None: return for c in coords.keys(): assert c in container.index.names, "Coordinate '{c}' not found in container index".format( c=c) for c in container.index.names: assert c in coords, "Data index '{c}' not found in supplied coordinates".format( c=c)
[docs]class Container(object): ''' Base class for metacsv Container objects Parameters ---------- coords : dict Container coordinates variables : dict Variable-specific attributes attrs : dict Container attributes Returns ------- container : object a :py:class:`~metacsv.Series`, :py:class:`~metacsv.DataFrame`, or :py:class:`~metacsv.Panel` object ''' def __init__(self, coords=None, variables=None, attrs=None, *args, **kwargs): self.coords = coords self.attrs = attrs self.variables = variables # Container Properties # coords @property def coords(self): '''Coordinates property of a metacsv Container''' if not hasattr(self, '_coords'): self._coords = Coordinates() return self._coords @coords.setter def coords(self, value): if value is None: self._coords = Coordinates() else: self._coords = Coordinates(value, container=self) @coords.deleter def coords(self): self._coords = None @property def base_coords(self): if not hasattr(self, '_coords'): self.coords = Coordinates() if self.coords == None: return None return self._coords._base_coords # attrs @property def attrs(self): '''Coordinates property of a metacsv Container''' if not hasattr(self, '_attrs'): self._attrs = Attributes() return self._attrs @attrs.setter def attrs(self, value): if value is None: self._attrs = Attributes() else: self._attrs = Attributes(value, container=self) @attrs.deleter def attrs(self): self._attrs = None # variables @property def variables(self): '''Coordinates property of a metacsv Container''' if not hasattr(self, '_variables'): self._variables = Variables() return self._variables @variables.setter def variables(self, value): if value is None: self._variables = Variables() else: self._variables = Variables(value, container=self) @variables.deleter def variables(self): self._variables = None # Special Container Methods
[docs] def add_coords(self): if self.coords == None: self.coords = Coordinates(container=self) self.coords.set_coords_from_data()
def _get_coord_data_from_index(self, coord): return self.index.get_level_values(coord) @staticmethod
[docs] def get_unique_multiindex(series): return series.iloc[np.unique(series.index.values, return_index=True)[1]]
@staticmethod
[docs] def stringify_index_names(series): series.index.names = list(map(str, series.index.names)) return series
@staticmethod
[docs] def strip_special_attributes(args, kwargs): attrs = kwargs.pop('attrs', {}).copy() def update_property(p_data, data, func=lambda x: x): if hasattr(data, 'copy'): data = data.copy() parsed = func(data) if parsed != None: p_data.update(parsed) def strip_property(prop, func=lambda x: x): p_data = {} update_property(p_data, attrs.pop(prop, {}), func) update_property(p_data, kwargs.pop(prop, {}), func) if len(p_data) == 0: p_data = None return p_data coords = strip_property( 'coords', lambda x: Coordinates.parse_coords_definition(x)[0]) variables = strip_property('variables') special = {} if (coords is not None) and (len(coords) > 0): special['coords'] = coords if (variables is not None) and (len(variables) > 0): special['variables'] = variables if (attrs is not None) and (len(attrs) > 0): special['attrs'] = attrs return args, kwargs, special
# Container formatting def _print_format(self): metacsv_str = '<{} {}>'.format( type(self).__module__ + '.' + type(self).__name__, self.shape) data_str = self.pandas_parent.__str__(self) postscript = '\n'.join( [str(p) for p in [self.coords, self.variables, self.attrs] if p != None]) return (metacsv_str + '\n' + data_str + ('\n\n' if len(postscript) > 0 else '') + postscript) def __repr__(self): return str(self) def __str__(self): return self._print_format() # Container conversion & I/O
[docs] def to_csv(self, fp, header_file=None, *args, **kwargs): ''' Write to a metacsv-formatted csv Parameters ---------- fp : str Path to which to write the metacsv-formatted CSV header_file : str_or_buffer A separate metacsv-formatted header file *args : passed to pandas.to_csv **kwargs : passed to pandas.to_csv Example ------- .. code-block:: python >>> from metacsv import DataFrame >>> import numpy as np >>> np.random.seed(1) >>> >>> DataFrame( ... pd.DataFrame(np.random.random((3,4))), ... attrs={'author': 'my name'} ... ).to_csv('my-metacsv-data.csv') ''' to_csv.metacsv_to_csv(self, fp, header_file=None, *args, **kwargs)
[docs] def to_header(self, fp): ''' Write attributes directly to a metacsv-formatted header file fp : str Path to which to write the metacsv-formatted header file Example ------- .. code-block:: python >>> from metacsv import DataFrame >>> import numpy as np >>> np.random.seed(1) >>> >>> df = DataFrame( ... np.random.random((3,4)), ... columns=['col'+str(i) for i in range(4)]) ... >>> df.attrs={'author': 'my name'} >>> df.to_header('mycsv.header') ''' to_csv.metacsv_to_header(fp, attrs=self.attrs, coords=self.coords, variables=self.variables)
[docs] def to_pandas(self): ''' Strip metacsv special attributes and return as a pandas Series, DataFrame, or Panel Example ------- .. code-block:: python >>> from metacsv import DataFrame >>> import numpy as np, pandas as pd >>> np.random.seed(1) >>> >>> df = DataFrame( ... np.random.random((3,4)), ... columns=['col'+str(i) for i in range(4)]) ... >>> df.index = pd.MultiIndex.from_tuples( ... [('a','X'),('b','Y'),('c','Z')], ... names=['abc','xyz']) ... >>> df.attrs={'author': 'my name'} >>> df.coords = {'abc': None, 'xyz': ['abc']} >>> df # doctest: +NORMALIZE_WHITESPACE <metacsv.core.containers.DataFrame (3, 4)> col0 col1 col2 col3 abc xyz a X 0.417022 0.720324 0.000114 0.302333 b Y 0.146756 0.092339 0.186260 0.345561 c Z 0.396767 0.538817 0.419195 0.685220 <BLANKLINE> Coordinates * abc (abc) object a, b, c xyz (abc) object X, Y, Z Attributes author: my name >>> df.to_pandas() # doctest: +NORMALIZE_WHITESPACE col0 col1 col2 col3 abc xyz a X 0.417022 0.720324 0.000114 0.302333 b Y 0.146756 0.092339 0.186260 0.345561 c Z 0.396767 0.538817 0.419195 0.685220 ''' return self.pandas_parent(self)
[docs] def to_xarray(self): ''' Convert to an xArray.Dataset .. note :: to_dataset is not yet implemented for Panel data. Example ------- .. code-block:: python >>> from metacsv import DataFrame >>> import numpy as np >>> np.random.seed(1) >>> >>> df = DataFrame(np.random.random((3,4)), columns=['col'+str(i) for i in range(4)]) >>> df.index = pd.MultiIndex.from_tuples([('a','X'),('b','Y'),('c','Z')], names=['abc','xyz']) >>> df.attrs={'author': 'my name'} >>> df.coords = {'abc': None, 'xyz': ['abc']} >>> df # doctest: +NORMALIZE_WHITESPACE <metacsv.core.containers.DataFrame (3, 4)> col0 col1 col2 col3 abc xyz a X 0.417022 0.720324 0.000114 0.302333 b Y 0.146756 0.092339 0.186260 0.345561 c Z 0.396767 0.538817 0.419195 0.685220 <BLANKLINE> Coordinates * abc (abc) object a, b, c xyz (abc) object X, Y, Z Attributes author: my name >>> df.to_xarray() # doctest: +SKIP <xarray.Dataset> Dimensions: (abc: 3) Coordinates: * abc (abc) object 'a' 'b' 'c' xyz (abc) object 'X' 'Y' 'Z' Data variables: col0 (abc) float64 0.417 0.1468 0.3968 col1 (abc) float64 0.7203 0.09234 0.5388 col2 (abc) float64 0.0001144 0.1863 0.4192 col3 (abc) float64 0.3023 0.3456 0.6852 Attributes: author: my name ''' if len(self.shape) == 1: return to_xarray.metacsv_series_to_dataarray(self) elif len(self.shape) == 2: return to_xarray.metacsv_dataframe_to_dataset(self) elif len(self.shape) > 2: raise NotImplementedError( 'to_dataarray not yet implemented for Panel data')
[docs] def to_dataarray(self): ''' Convert to an xArray.DataArray .. note :: If a DataFrame is passed, columns will be stacked and treated as coordinates. ``to_dataset`` is not yet implemented for Panel data. Example ------- .. code-block:: python >>> from metacsv import DataFrame >>> import numpy as np >>> np.random.seed(1) >>> >>> df = DataFrame( ... np.random.random((3,4)), ... index=list('ABC'), ... attrs={'author': 'my name'}) ... >>> df.to_dataarray() # doctest: +SKIP <xarray.DataArray (ind_0: 3, coldim_0: 4)> array([[ 4.17022005e-01, 7.20324493e-01, 1.14374817e-04, 3.02332573e-01], [ 1.46755891e-01, 9.23385948e-02, 1.86260211e-01, 3.45560727e-01], [ 3.96767474e-01, 5.38816734e-01, 4.19194514e-01, 6.85219500e-01]]) Coordinates: * ind_0 (ind_0) object 'A' 'B' 'C' * coldim_0 (coldim_0) int64 0 1 2 3 Attributes: author: my name ''' if len(self.shape) == 1: return to_xarray.metacsv_series_to_dataarray(self) elif len(self.shape) == 2: return to_xarray.metacsv_dataframe_to_dataarray(self) elif len(self.shape) > 2: raise NotImplementedError( 'to_dataarray not yet implemented for Panel data')
[docs] def to_dataset(self): ''' Convert to an xArray.Dataset .. note :: If a Series is passed, the variable will be named 'data'. ``to_netcdf`` is not yet implemented for Panel data. Example ------- .. code-block:: python >>> from metacsv import DataFrame >>> import numpy as np >>> np.random.seed(1) >>> >>> df = DataFrame( ... np.random.random((3,4)), ... attrs={'author': 'my name'}) ... >>> df.to_dataset() <xarray.Dataset> Dimensions: (index: 3) Coordinates: * index (index) int64 0 1 2 Data variables: 0 (index) float64 0.417 0.1468 0.3968 1 (index) float64 0.7203 0.09234 0.5388 2 (index) float64 0.0001144 0.1863 0.4192 3 (index) float64 0.3023 0.3456 0.6852 Attributes: author: my name ''' if len(self.shape) == 1: return to_xarray.metacsv_series_to_dataset(self) elif len(self.shape) == 2: return to_xarray.metacsv_dataframe_to_dataset(self) elif len(self.shape) > 2: raise NotImplementedError( 'to_dataarray not yet implemented for Panel data')
[docs] def to_netcdf(self, fp): ''' Convert to a NetCDF file .. note :: If a Series is passed, the variable will be named 'data'. ``to_netcdf`` is not yet implemented for Panel data. Parameters ---------- fp : string_or_buffer The filepath or file object to be written Example ------- .. code-block:: python >>> from metacsv import DataFrame >>> import numpy as np >>> np.random.seed(1) >>> >>> df = DataFrame( ... np.random.random((3,4)), ... columns=list('ABCD'), ... attrs={'author': 'my name'}) ... >>> df.to_netcdf('test.nc') .. code-block:: python >>> import xarray as xr >>> xr.open_dataset('test.nc') <xarray.Dataset> Dimensions: (index: 3) Coordinates: * index (index) int64 0 1 2 Data variables: A (index) float64 0.417 0.1468 0.3968 B (index) float64 0.7203 0.09234 0.5388 C (index) float64 0.0001144 0.1863 0.4192 D (index) float64 0.3023 0.3456 0.6852 Attributes: author: my name ''' self.to_dataset().to_netcdf(fp)