import os import numpy as np import collections from contextlib import closing from common.file_helpers import mkdirs_exists_ok class ColumnStoreReader(): def __init__(self, path, mmap=False, allow_pickle=False, direct_io=False, prefix="", np_data=None): if not (path and os.path.isdir(path)): raise ValueError("Not a column store: {}".format(path)) self._path = os.path.realpath(path) self._keys = os.listdir(self._path) self._mmap = mmap self._allow_pickle = allow_pickle self._direct_io = direct_io self._is_dict = 'columnstore' in self._keys self._prefix = prefix self._np_data = np_data @property def path(self): return self._path def close(self): pass def get(self, key): try: return self[key] except KeyError: return None def keys(self): if self._is_dict: if not self._np_data: self._np_data = self._load(os.path.join(self._path, 'columnstore')) # return top level keys by matching on prefix and splitting on '/' return { k[len(self._prefix):].split('/')[0] for k in self._np_data.keys() if k.startswith(self._prefix) } return list(self._keys) def iteritems(self): for k in self: yield (k, self[k]) def itervalues(self): for k in self: yield self[k] def get_npy_path(self, key): """Gets a filesystem path for an npy file containing the specified array, or none if the column store does not contain key. """ if key in self: return os.path.join(self._path, key) else: return None def _load(self, path): if self._mmap: # note that direct i/o does nothing for mmap since file read/write interface is not used return np.load(path, mmap_mode='r', allow_pickle=self._allow_pickle, fix_imports=False) if self._direct_io: opener = lambda path, flags: os.open(path, os.O_RDONLY | os.O_DIRECT) with open(path, 'rb', buffering=0, opener=opener) as f: return np.load(f, allow_pickle=self._allow_pickle, fix_imports=False) return np.load(path, allow_pickle=self._allow_pickle, fix_imports=False) def __getitem__(self, key): try: if self._is_dict: path = os.path.join(self._path, 'columnstore') else: path = os.path.join(self._path, key) # TODO(mgraczyk): This implementation will need to change for zip. if not self._is_dict and os.path.isdir(path): return ColumnStoreReader(path) else: if self._is_dict and self._np_data: ret = self._np_data else: ret = self._load(path) if type(ret) == np.lib.npyio.NpzFile: if self._is_dict: if self._prefix+key in ret.keys(): return ret[self._prefix+key] if any(k.startswith(self._prefix+key+"/") for k in ret.keys()): return ColumnStoreReader(self._path, mmap=self._mmap, allow_pickle=self._allow_pickle, direct_io=self._direct_io, prefix=self._prefix+key+"/", np_data=ret) raise KeyError(self._prefix+key) # if it's saved as compressed, it has arr_0 only in the file. deref this return ret['arr_0'] else: return ret except IOError: raise KeyError(key) def __contains__(self, item): try: self[item] return True except KeyError: return False def __len__(self): return len(self._keys) def __bool__(self): return bool(self._keys) def __iter__(self): return iter(self._keys) def __str__(self): return "ColumnStoreReader({})".format(str({k: "..." for k in self._keys})) def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() class ColumnStoreWriter(): def __init__(self, path, allow_pickle=False): self._path = path self._allow_pickle = allow_pickle mkdirs_exists_ok(self._path) def map_column(self, path, dtype, shape): npy_path = os.path.join(self._path, path) mkdirs_exists_ok(os.path.dirname(npy_path)) return np.lib.format.open_memmap(npy_path, mode='w+', dtype=dtype, shape=shape) def add_column(self, path, data, dtype=None, compression=False, overwrite=False): npy_path = os.path.join(self._path, path) mkdirs_exists_ok(os.path.dirname(npy_path)) if overwrite: f = open(npy_path, "wb") else: f = os.fdopen(os.open(npy_path, os.O_WRONLY | os.O_CREAT | os.O_EXCL), "wb") with closing(f) as f: data2 = np.array(data, copy=False, dtype=dtype) if compression: np.savez_compressed(f, data2) else: np.save(f, data2, allow_pickle=self._allow_pickle, fix_imports=False) def add_group(self, group_name): # TODO(mgraczyk): This implementation will need to change if we add zip or compression. return ColumnStoreWriter(os.path.join(self._path, group_name)) def add_dict(self, data, dtypes=None, compression=False, overwrite=False): # default name exists to have backward compatibility with equivalent directory structure npy_path = os.path.join(self._path, "columnstore") mkdirs_exists_ok(os.path.dirname(npy_path)) flat_dict = dict() _flatten_dict(flat_dict, "", data) for k, v in flat_dict.items(): dtype = dtypes[k] if dtypes is not None and k in dtypes else None flat_dict[k] = np.array(v, copy=False, dtype=dtype) if overwrite: f = open(npy_path, "wb") else: f = os.fdopen(os.open(npy_path, os.O_WRONLY | os.O_CREAT | os.O_EXCL), "wb") with closing(f) as f: if compression: np.savez_compressed(f, **flat_dict) else: np.savez(f, **flat_dict) def close(self): pass def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() def _flatten_dict(flat, ns, d): for k, v in d.items(): p = (ns + "/" if len(ns) else "") + k if isinstance(v, collections.Mapping): _flatten_dict(flat, p, v) else: flat[p] = v def _save_dict_as_column_store(values, writer, compression): for k, v in values.items(): if isinstance(v, collections.Mapping): _save_dict_as_column_store(v, writer.add_group(k), compression) else: writer.add_column(k, v, compression=compression) def save_dict_as_column_store(values, output_path, compression=False): with ColumnStoreWriter(output_path) as writer: _save_dict_as_column_store(values, writer, compression)