# -*- coding: utf-8 -*-
"""
Core functionality
"""
import ast
import collections
from future.builtins import str
import h5py
import numpy as np
import os
import re
from subprocess import call
import warnings
from . import lib
# deprecation warnings are printed to sys.stdout
warnings.simplefilter('default', category=DeprecationWarning)
# check whether quantities is available
try:
import quantities as pq
quantities_found = True
except ImportError:
quantities_found = False
# make sure correct h5py version is available
h5py_version = h5py.version.version_tuple
h5py_version_int = int('{}{}{}'.format(h5py_version.major,
h5py_version.minor,
h5py_version.bugfix))
if h5py_version_int < 231:
raise ImportError("Using h5py version {version}. Version must "
"be >= 2.3.1".format(version=h5py.version.version))
[docs]def save(filename, d, write_mode='a', overwrite_dataset=False,
resize=False, path=None, dict_label='', compression=None):
"""
Save a dictionary to an hdf5 file.
Parameters
----------
filename : string
The file name of the hdf5 file.
d : dict
The dictionary to be stored.
write_mode : {'a', 'w'}, optional
Analog to normal file handling in python. Defaults to 'a'.
overwrite_dataset : bool, optional
Whether datasets should be overwritten if already existing.
Defaults to False.
resize : bool, optional
If True, the hdf5 file is resized after writing all data,
may reduce file size. Uses h5repack (see
https://www.hdfgroup.org/HDF5/doc/RM/Tools.html#Tools-Repack).
Caution: slows down writing. Defaults to False.
path : string, optional
If not empty, the dictionary is stored under the given path in the hdf5
file, with levels separated by '/'.
For instance, path='test/trial/spiketrains'. Defaults to None.
compression : {'gzip', 'szip','lzf', 0,...,10}, optional
Compression strategy to reduce file size. An integer >0, <=10 leads to
usage of gzip,indicating the level of compression. 'gzip' is recommended.
See http://docs.h5py.org/en/latest/high/dataset.html for details.
Caution: This slows down writing and loading of data.
Attention: Will be ignored for scalar data.
Returns
-------
None
Examples
--------
>>> d = {}
>>> d['a'] = {'a1': [1, 2, 3], 'a2': 4., 'a3': {'a31': 'Test'}}
>>> d['b'] = 'string'
>>> import h5py_wrapper as h5w
>>> h5w.save('example.h5', d)
"""
try:
f = h5py.File(filename, write_mode)
except IOError:
raise IOError("unable to create {filename} (File "
"accessability: Unable to open "
"file)".format(filename=filename))
else:
try:
if dict_label:
warnings.warn("Deprecated argument dict_label provided. "
"dict_label will be removed in the next release. "
"Please use path instead.",
DeprecationWarning)
if path is not None:
raise ValueError("dict_label and path must not "
"be defined simultaneously.")
path = dict_label
if path:
base = f.require_group(path)
_dict_to_h5(f, d, overwrite_dataset, parent_group=base,
compression=compression)
else:
_dict_to_h5(f, d, overwrite_dataset, compression=compression)
finally: # make sure file is closed even if an exception is raised
fname = f.filename
f.close()
if overwrite_dataset is True and resize is True:
call(['h5repack', '-i', fname, '-o', fname + '_repack'])
call(['mv', fname + '_repack', fname])
[docs]def load(filename, path='', lazy=False):
"""
Loads a dictionary from an hdf5 file.
Parameters
----------
filename : string
The file name of the hdf5 file.
path : string, optional
If not empty, specifies a path to access deeper levels in the hdf5 file.
lazy : boolean, optional
If True, only keys from all levels of the dictionary are loaded
with values. Defaults to False.
Returns
-------
dictionary : dict
Dictionary from the hdf5 file.
Examples
--------
>>> d = {}
>>> d['a'] = {'a1': [1, 2, 3], 'a2': 4., 'a3': {'a31': 'Test'}}
>>> d['b'] = 'string'
>>> import h5py_wrapper as h5w
>>> h5w.save('example_load.h5', d, overwrite_dataset=True)
>>> h5w.load('example_load.h5')
{u'a': {u'a1': array([1, 2, 3]), u'a3': {u'a31': 'Test'}, u'a2': 4.0}, u'b': 'string'}
"""
try:
f = h5py.File(filename, 'r')
except IOError:
raise IOError("unable to open {filename} (File accessability: "
"Unable to open file)".format(filename=filename))
else:
try:
if not path:
obj = f
else:
try:
obj = f[path]
except KeyError:
raise KeyError("unable to open {filename}/{path} "
"(Key accessability: Unable to access "
"key)".format(filename=filename, path=path))
_, d = _dict_from_h5(obj, lazy=lazy)
finally:
f.close()
return d
# ______________________________________________________________________________
# Auxiliary functions
def _dict_to_h5(f, d, overwrite_dataset, compression=None, parent_group=None):
"""
Recursively adds the dictionary to the hdf5 file f.
"""
if parent_group is None:
parent_group = f.parent
for key, value in d.items():
if isinstance(value, collections.MutableMapping):
group_name = os.path.join(parent_group.name, str(key))
group = f.require_group(group_name)
_dict_to_h5(f, value, overwrite_dataset, parent_group=group,
compression=compression)
# explicitly store type of key
group.attrs['_key_type'] = type(key).__name__
else:
if str(key) not in parent_group:
_create_dataset(parent_group, key, value,
compression=compression)
else:
if overwrite_dataset is True:
del parent_group[str(key)]
_create_dataset(parent_group, key, value,
compression=compression)
else:
raise KeyError("Dataset {key} already "
"exists.".format(key=os.path.join(
parent_group.name, key)))
def _create_dataset(parent_group, key, value, compression=None):
"""
Creates the dataset in parent_group.
"""
if value is None: # h5py cannot store NoneType.
dataset = parent_group.create_dataset(
str(key), data='None', compression=compression)
elif isinstance(value, (list, np.ndarray, tuple)):
if np.array(value).dtype.name == 'object':
# We store 2d arrays with unequal dimensions by reducing
# it to a 1d array and additionally storing the original shape.
# This does not work for more than two dimensions.
if len(np.shape(value)) > 1:
raise ValueError("Dataset {key} has an unsupported "
"format.".format(key=os.path.join(
parent_group.name, key)))
else:
oldshape = np.array([len(x) for x in value])
value_types = lib.convert_iterable_to_numpy_array([type(x).__name__ for x in value])
data_reshaped = np.hstack(value)
dataset = parent_group.create_dataset(
str(key), data=data_reshaped, compression=compression)
dataset.attrs['oldshape'] = oldshape
dataset.attrs['custom_shape'] = True
dataset.attrs['custom_value_types'] = value_types
elif quantities_found and isinstance(value, pq.Quantity):
dataset = parent_group.create_dataset(str(key), data=value)
dataset.attrs['_unit'] = value.dimensionality.string
else:
dataset = parent_group.create_dataset(
str(key), data=lib.convert_iterable_to_numpy_array(value), compression=compression)
# ignore compression argument for scalar datasets
elif not isinstance(value, collections.Iterable):
dataset = parent_group.create_dataset(str(key), data=value)
else:
dataset = parent_group.create_dataset(
str(key), data=value, compression=compression)
# explicitly store type of key and value
dataset.attrs['_key_type'] = type(key).__name__
dataset.attrs['_value_type'] = type(value).__name__
def _dict_from_h5(f, lazy=False):
"""
Recursively loads the dictionary from the hdf5 file f.
Converts all datasets to numpy types.
"""
name = _evaluate_key(f)
if h5py.h5i.get_type(f.id) == 5: # check if f is a dataset
return name, _load_dataset(f, lazy)
else:
d = {}
for obj in f.values():
sub_name, sub_d = _dict_from_h5(obj, lazy=lazy)
d[sub_name] = sub_d
return name, d
def _load_dataset(f, lazy=False):
"""
Loads the dataset of group f and returns its name and value.
If lazy is True, it returns None as value.
"""
if lazy:
return None
else:
try:
value_type = f.attrs['_value_type']
except KeyError:
raise KeyError("No value type stored. This file has "
"probably been created with a previous release version. "
"Please use the conversion script to convert your "
"file.")
if isinstance(value_type, bytes):
value_type = str(value_type, 'utf-8')
if value_type == 'NoneType':
return None
else:
if (len(f.attrs.keys()) > 0 and
'custom_shape' in f.attrs):
return _load_custom_shape(f)
elif '_unit' in f.attrs:
return _cast_value_type(f.value, value_type,
unit=f.attrs['_unit'])
else:
return _cast_value_type(f.value, value_type)
def _evaluate_key(f):
"""
Evaluate the key of f and handle non-string data types.
"""
name = os.path.basename(f.name) # to return only name of this level
if '_key_type' in f.attrs:
key_type = f.attrs['_key_type']
if isinstance(key_type, bytes):
key_type = str(key_type, 'utf-8')
if key_type not in ['str', 'unicode', 'string_']:
name = ast.literal_eval(name)
return name
def _load_custom_shape(f):
"""
Reshape array with unequal dimensions into original shape.
"""
data_reshaped = []
value = f.value
custom_value_types = f.attrs['custom_value_types'].astype(np.unicode_)
for (j, i), value_type in zip(lib.accumulate(f.attrs['oldshape']),
custom_value_types):
cast_value = _cast_value_type(value[j:j + i],
value_type)
data_reshaped.append(cast_value)
return eval(valuetype_dict[value_type])(data_reshaped)
def _cast_value_type(value, value_type, unit=None):
"""
Casts value into the correct type defined in attrs.
"""
if value_type in valuetype_dict:
if unit:
if quantities_found:
value = eval(valuetype_dict[value_type])(value, unit)
else:
raise ImportError("Could not find quantities package, "
"please install the package and "
"reload the wrapper.")
else:
if value_type in ['list', 'tuple']:
if isinstance(value, np.ndarray) and value.dtype.kind == 'S':
value = value.astype(np.unicode_)
# ensures that all dimensions of the array are converted to the correct value type
value = _array_to_type(value, value_type)
else:
if hasattr(value, 'decode'):
value = value.decode()
value = eval(valuetype_dict[value_type])(value)
if isinstance(value, np.ndarray) and value.dtype.kind == 'S':
value = value.astype(np.unicode_)
return value
else:
raise NotImplementedError("Unsupported data type: "
"{value_type}.".format(value_type=value_type))
def _array_to_type(value, value_type):
"""
Casts members of arrays to the specified type recursively.
"""
if len(value) > 0 and isinstance(value[0], np.ndarray):
return eval(valuetype_dict[value_type])(_array_to_type(i, value_type) for i in value)
else:
return eval(valuetype_dict[value_type])(value)
# Look-up table with supported datatypes
valuetype_dict = {'tuple': 'tuple',
'ndarray': 'np.array',
'list': 'list',
'float': 'float',
'int': 'int',
'str': 'str',
'bool': 'bool',
'Quantity': 'pq.Quantity',
'int64': 'np.int64',
'float64': 'np.float64',
'complex128': 'np.complex128'}