parent
							
								
									c94fb86e2d
								
							
						
					
					
						commit
						cc6477015a
					
				
				 2 changed files with 1 additions and 211 deletions
			
			
		@ -1,210 +0,0 @@ | 
				
			||||
import os | 
				
			||||
import numpy as np | 
				
			||||
import collections | 
				
			||||
from contextlib import closing | 
				
			||||
 | 
				
			||||
from common.file_helpers import mkdirs_exists_ok | 
				
			||||
 | 
				
			||||
class ColumnStoreReader(): | 
				
			||||
  def __init__(self, path, mmap=False, allow_pickle=False, direct_io=False, prefix="", np_data=None): | 
				
			||||
    if not (path and os.path.isdir(path)): | 
				
			||||
      raise ValueError("Not a column store: {}".format(path)) | 
				
			||||
 | 
				
			||||
    self._path = os.path.realpath(path) | 
				
			||||
    self._keys = os.listdir(self._path) | 
				
			||||
    self._mmap = mmap | 
				
			||||
    self._allow_pickle = allow_pickle | 
				
			||||
    self._direct_io = direct_io | 
				
			||||
    self._is_dict = 'columnstore' in self._keys | 
				
			||||
    self._prefix = prefix | 
				
			||||
    self._np_data = np_data | 
				
			||||
 | 
				
			||||
  @property | 
				
			||||
  def path(self): | 
				
			||||
    return self._path | 
				
			||||
 | 
				
			||||
  def close(self): | 
				
			||||
    pass | 
				
			||||
 | 
				
			||||
  def get(self, key): | 
				
			||||
    try: | 
				
			||||
      return self[key] | 
				
			||||
    except KeyError: | 
				
			||||
      return None | 
				
			||||
 | 
				
			||||
  def keys(self): | 
				
			||||
    if self._is_dict: | 
				
			||||
      if not self._np_data: | 
				
			||||
        self._np_data = self._load(os.path.join(self._path, 'columnstore')) | 
				
			||||
      # return top level keys by matching on prefix and splitting on '/' | 
				
			||||
      return { | 
				
			||||
        k[len(self._prefix):].split('/')[0] | 
				
			||||
        for k in self._np_data.keys() | 
				
			||||
        if k.startswith(self._prefix) | 
				
			||||
      } | 
				
			||||
 | 
				
			||||
    return list(self._keys) | 
				
			||||
 | 
				
			||||
  def iteritems(self): | 
				
			||||
    for k in self: | 
				
			||||
      yield (k, self[k]) | 
				
			||||
 | 
				
			||||
  def itervalues(self): | 
				
			||||
    for k in self: | 
				
			||||
      yield self[k] | 
				
			||||
 | 
				
			||||
  def get_npy_path(self, key): | 
				
			||||
    """Gets a filesystem path for an npy file containing the specified array, | 
				
			||||
       or none if the column store does not contain key. | 
				
			||||
    """ | 
				
			||||
    if key in self: | 
				
			||||
      return os.path.join(self._path, key) | 
				
			||||
    else: | 
				
			||||
      return None | 
				
			||||
 | 
				
			||||
  def _load(self, path): | 
				
			||||
    if self._mmap: | 
				
			||||
      # note that direct i/o does nothing for mmap since file read/write interface is not used | 
				
			||||
      return np.load(path, mmap_mode='r', allow_pickle=self._allow_pickle, fix_imports=False) | 
				
			||||
 | 
				
			||||
    if self._direct_io: | 
				
			||||
      opener = lambda path, flags: os.open(path, os.O_RDONLY | os.O_DIRECT) | 
				
			||||
      with open(path, 'rb', buffering=0, opener=opener) as f: | 
				
			||||
        return np.load(f, allow_pickle=self._allow_pickle, fix_imports=False) | 
				
			||||
 | 
				
			||||
    return np.load(path, allow_pickle=self._allow_pickle, fix_imports=False) | 
				
			||||
 | 
				
			||||
  def __getitem__(self, key): | 
				
			||||
    try: | 
				
			||||
      if self._is_dict: | 
				
			||||
        path = os.path.join(self._path, 'columnstore') | 
				
			||||
      else: | 
				
			||||
        path = os.path.join(self._path, key) | 
				
			||||
 | 
				
			||||
      # TODO(mgraczyk): This implementation will need to change for zip. | 
				
			||||
      if not self._is_dict and os.path.isdir(path): | 
				
			||||
        return ColumnStoreReader(path) | 
				
			||||
      else: | 
				
			||||
        if self._is_dict and self._np_data: | 
				
			||||
          ret = self._np_data | 
				
			||||
        else: | 
				
			||||
          ret = self._load(path) | 
				
			||||
 | 
				
			||||
        if type(ret) == np.lib.npyio.NpzFile: | 
				
			||||
          if self._is_dict: | 
				
			||||
            if self._prefix+key in ret.keys(): | 
				
			||||
              return ret[self._prefix+key] | 
				
			||||
            if any(k.startswith(self._prefix+key+"/") for k in ret.keys()): | 
				
			||||
              return ColumnStoreReader(self._path, mmap=self._mmap, allow_pickle=self._allow_pickle, direct_io=self._direct_io, prefix=self._prefix+key+"/", np_data=ret) | 
				
			||||
            raise KeyError(self._prefix+key) | 
				
			||||
 | 
				
			||||
          # if it's saved as compressed, it has arr_0 only in the file. deref this | 
				
			||||
          return ret['arr_0'] | 
				
			||||
        else: | 
				
			||||
          return ret | 
				
			||||
    except IOError: | 
				
			||||
      raise KeyError(key) | 
				
			||||
 | 
				
			||||
  def __contains__(self, item): | 
				
			||||
    try: | 
				
			||||
      self[item] | 
				
			||||
      return True | 
				
			||||
    except KeyError: | 
				
			||||
      return False | 
				
			||||
 | 
				
			||||
  def __len__(self): | 
				
			||||
    return len(self._keys) | 
				
			||||
 | 
				
			||||
  def __bool__(self): | 
				
			||||
    return bool(self._keys) | 
				
			||||
 | 
				
			||||
  def __iter__(self): | 
				
			||||
    return iter(self._keys) | 
				
			||||
 | 
				
			||||
  def __str__(self): | 
				
			||||
    return "ColumnStoreReader({})".format(str({k: "..." for k in self._keys})) | 
				
			||||
 | 
				
			||||
  def __enter__(self): return self | 
				
			||||
  def __exit__(self, type, value, traceback): self.close() | 
				
			||||
 | 
				
			||||
class ColumnStoreWriter(): | 
				
			||||
  def __init__(self, path, allow_pickle=False): | 
				
			||||
    self._path = path | 
				
			||||
    self._allow_pickle = allow_pickle | 
				
			||||
    mkdirs_exists_ok(self._path) | 
				
			||||
 | 
				
			||||
  def map_column(self, path, dtype, shape): | 
				
			||||
    npy_path = os.path.join(self._path, path) | 
				
			||||
    mkdirs_exists_ok(os.path.dirname(npy_path)) | 
				
			||||
    return np.lib.format.open_memmap(npy_path, mode='w+', dtype=dtype, shape=shape) | 
				
			||||
 | 
				
			||||
  def add_column(self, path, data, dtype=None, compression=False, overwrite=False): | 
				
			||||
    npy_path = os.path.join(self._path, path) | 
				
			||||
    mkdirs_exists_ok(os.path.dirname(npy_path)) | 
				
			||||
 | 
				
			||||
    if overwrite: | 
				
			||||
      f = open(npy_path, "wb") | 
				
			||||
    else: | 
				
			||||
      f = os.fdopen(os.open(npy_path, os.O_WRONLY | os.O_CREAT | os.O_EXCL), "wb") | 
				
			||||
 | 
				
			||||
    with closing(f) as f: | 
				
			||||
      data2 = np.array(data, copy=False, dtype=dtype) | 
				
			||||
      if compression: | 
				
			||||
        np.savez_compressed(f, data2) | 
				
			||||
      else: | 
				
			||||
        np.save(f, data2, allow_pickle=self._allow_pickle, fix_imports=False) | 
				
			||||
 | 
				
			||||
  def add_group(self, group_name): | 
				
			||||
    # TODO(mgraczyk): This implementation will need to change if we add zip or compression. | 
				
			||||
    return ColumnStoreWriter(os.path.join(self._path, group_name)) | 
				
			||||
 | 
				
			||||
  def add_dict(self, data, dtypes=None, compression=False, overwrite=False): | 
				
			||||
    # default name exists to have backward compatibility with equivalent directory structure | 
				
			||||
    npy_path = os.path.join(self._path, "columnstore") | 
				
			||||
    mkdirs_exists_ok(os.path.dirname(npy_path)) | 
				
			||||
 | 
				
			||||
    flat_dict = dict() | 
				
			||||
    _flatten_dict(flat_dict, "", data) | 
				
			||||
    for k, v in flat_dict.items(): | 
				
			||||
      dtype = dtypes[k] if dtypes is not None and k in dtypes else None | 
				
			||||
      flat_dict[k] = np.array(v, copy=False, dtype=dtype) | 
				
			||||
 | 
				
			||||
    if overwrite: | 
				
			||||
      f = open(npy_path, "wb") | 
				
			||||
    else: | 
				
			||||
      f = os.fdopen(os.open(npy_path, os.O_WRONLY | os.O_CREAT | os.O_EXCL), "wb") | 
				
			||||
 | 
				
			||||
    with closing(f) as f: | 
				
			||||
      if compression: | 
				
			||||
        np.savez_compressed(f, **flat_dict) | 
				
			||||
      else: | 
				
			||||
        np.savez(f, **flat_dict) | 
				
			||||
 | 
				
			||||
  def close(self): | 
				
			||||
    pass | 
				
			||||
 | 
				
			||||
  def __enter__(self): return self | 
				
			||||
  def __exit__(self, type, value, traceback): self.close() | 
				
			||||
 | 
				
			||||
 | 
				
			||||
def _flatten_dict(flat, ns, d): | 
				
			||||
  for k, v in d.items(): | 
				
			||||
    p = (ns + "/" if len(ns) else "") + k | 
				
			||||
    if isinstance(v, collections.Mapping): | 
				
			||||
      _flatten_dict(flat, p, v) | 
				
			||||
    else: | 
				
			||||
      flat[p] = v | 
				
			||||
 | 
				
			||||
 | 
				
			||||
def _save_dict_as_column_store(values, writer, compression): | 
				
			||||
  for k, v in values.items(): | 
				
			||||
    if isinstance(v, collections.Mapping): | 
				
			||||
      _save_dict_as_column_store(v, writer.add_group(k), compression) | 
				
			||||
    else: | 
				
			||||
      writer.add_column(k, v, compression=compression) | 
				
			||||
 | 
				
			||||
 | 
				
			||||
def save_dict_as_column_store(values, output_path, compression=False): | 
				
			||||
  with ColumnStoreWriter(output_path) as writer: | 
				
			||||
    _save_dict_as_column_store(values, writer, compression) | 
				
			||||
 | 
				
			||||
					Loading…
					
					
				
		Reference in new issue