diff --git a/common/column_store.py b/common/column_store.py index 17ae16d34e..6f162db1b1 100644 --- a/common/column_store.py +++ b/common/column_store.py @@ -6,7 +6,7 @@ from contextlib import closing from common.file_helpers import mkdirs_exists_ok class ColumnStoreReader(): - def __init__(self, path, mmap=False, allow_pickle=False, direct_io=False): + def __init__(self, path, mmap=False, allow_pickle=False, direct_io=False, prefix="", np_data=None): if not (path and os.path.isdir(path)): raise ValueError("Not a column store: {}".format(path)) @@ -15,6 +15,9 @@ class ColumnStoreReader(): self._mmap = mmap self._allow_pickle = allow_pickle self._direct_io = direct_io + self._is_dict = 'columnstore' in self._keys + self._prefix = prefix + self._np_data = np_data @property def path(self): @@ -30,6 +33,11 @@ class ColumnStoreReader(): return None def keys(self): + if self._is_dict: + if not self._np_data: + self._np_data = self._load(os.path.join(self._path, 'columnstore')) + return self._np_data.keys() + return list(self._keys) def iteritems(self): @@ -49,25 +57,42 @@ class ColumnStoreReader(): else: return None + def _load(self, path): + if self._mmap: + # note that direct i/o does nothing for mmap since file read/write interface is not used + return np.load(path, mmap_mode='r', allow_pickle=self._allow_pickle, fix_imports=False) + + if self._direct_io: + opener = lambda path, flags: os.open(path, os.O_RDONLY | os.O_DIRECT) + with open(path, 'rb', buffering=0, opener=opener) as f: + return np.load(f, allow_pickle=self._allow_pickle, fix_imports=False) + + return np.load(path, allow_pickle=self._allow_pickle, fix_imports=False) + def __getitem__(self, key): try: - path = os.path.join(self._path, key) + if self._is_dict: + path = os.path.join(self._path, 'columnstore') + else: + path = os.path.join(self._path, key) # TODO(mgraczyk): This implementation will need to change for zip. - if os.path.isdir(path): + if not self._is_dict and os.path.isdir(path): return ColumnStoreReader(path) else: - if self._mmap: - # note that direct i/o does nothing for mmap since file read/write interface is not used - ret = np.load(path, mmap_mode='r', allow_pickle=self._allow_pickle, fix_imports=False) + if self._is_dict and self._np_data: + ret = self._np_data else: - if self._direct_io: - opener = lambda path, flags: os.open(path, os.O_RDONLY | os.O_DIRECT) - with open(path, 'rb', buffering=0, opener=opener) as f: - ret = np.load(f, allow_pickle=self._allow_pickle, fix_imports=False) - else: - ret = np.load(path, allow_pickle=self._allow_pickle, fix_imports=False) + ret = self._load(path) + if type(ret) == np.lib.npyio.NpzFile: + if self._is_dict: + if self._prefix+key in ret.keys(): + return ret[self._prefix+key] + if any(k.startswith(self._prefix+key+"/") for k in ret.keys()): + return ColumnStoreReader(self._path, mmap=self._mmap, allow_pickle=self._allow_pickle, direct_io=self._direct_io, prefix=self._prefix+key+"/", np_data=ret) + raise KeyError(self._prefix+key) + # if it's saved as compressed, it has arr_0 only in the file. deref this return ret['arr_0'] else: @@ -128,6 +153,27 @@ class ColumnStoreWriter(): # TODO(mgraczyk): This implementation will need to change if we add zip or compression. return ColumnStoreWriter(os.path.join(self._path, group_name)) + def add_dict(self, data, dtype=None, compression=False, overwrite=False): + # default name exists to have backward compatibility with equivalent directory structure + npy_path = os.path.join(self._path, "columnstore") + mkdirs_exists_ok(os.path.dirname(npy_path)) + + flat_dict = dict() + _flatten_dict(flat_dict, "", data) + for k, v in flat_dict.items(): + flat_dict[k] = np.array(v, copy=False, dtype=dtype) + + if overwrite: + f = open(npy_path, "wb") + else: + f = os.fdopen(os.open(npy_path, os.O_WRONLY | os.O_CREAT | os.O_EXCL), "wb") + + with closing(f) as f: + if compression: + np.savez_compressed(f, **flat_dict) + else: + np.savez(f, **flat_dict) + def close(self): pass @@ -135,6 +181,15 @@ class ColumnStoreWriter(): def __exit__(self, type, value, traceback): self.close() +def _flatten_dict(flat, ns, d): + for k, v in d.items(): + p = (ns + "/" if len(ns) else "") + k + if isinstance(v, collections.Mapping): + _flatten_dict(flat, p, v) + else: + flat[p] = v + + def _save_dict_as_column_store(values, writer, compression): for k, v in values.items(): if isinstance(v, collections.Mapping): @@ -147,4 +202,3 @@ def save_dict_as_column_store(values, output_path, compression=False): with ColumnStoreWriter(output_path) as writer: _save_dict_as_column_store(values, writer, compression) -