From bfcab0462ec539851f240a638c284b175e9e9e2f Mon Sep 17 00:00:00 2001 From: Ohad Rubin Date: Thu, 27 Aug 2020 21:32:39 +0300 Subject: [PATCH 1/3] before pull-req --- allennlp/common/mmap.py | 251 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 251 insertions(+) create mode 100644 allennlp/common/mmap.py diff --git a/allennlp/common/mmap.py b/allennlp/common/mmap.py new file mode 100644 index 00000000000..0dd98217ff0 --- /dev/null +++ b/allennlp/common/mmap.py @@ -0,0 +1,251 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +from functools import lru_cache +import os +import shutil +import struct +import numpy as np +import torch +from allennlp.data.fields import DataArray + +dtypes = { + 1: np.uint8, + 2: np.int8, + 3: np.int16, + 4: np.int32, + 5: np.int64, + 6: np.float, + 7: np.double, + 8: np.uint16, +} + + +def code(dtype): + for k in dtypes.keys(): + if dtypes[k] == dtype: + return k + raise ValueError(dtype) + + +def index_file_path(prefix_path): + return f"{prefix_path}.idx" + + +def data_file_path(prefix_path): + return f"{prefix_path}.bin" + + +def _warmup_mmap_file(path): + with open(path, "rb") as stream: + while stream.read(100 * 1024 * 1024): + pass + + +class MMapIndexedCache(torch.utils.data.Dataset): + class Index(object): + _HDR_MAGIC = b"MMIDIDX\x00\x00" + + @classmethod + def writer(cls, path, dtype): + class _Writer(object): + def __enter__(self): + self._file = open(path, "wb") + + self._file.write(cls._HDR_MAGIC) + self._file.write(struct.pack(" Date: Sun, 30 Aug 2020 05:04:32 +0300 Subject: [PATCH 2/3] added MMapCache, according to the API --- allennlp/common/mmap.py | 91 ++++++++++++++++++++++++++++++++++------- 1 file changed, 76 insertions(+), 15 deletions(-) diff --git a/allennlp/common/mmap.py b/allennlp/common/mmap.py index 0dd98217ff0..d20f425dc13 100644 --- a/allennlp/common/mmap.py +++ b/allennlp/common/mmap.py @@ -44,7 +44,7 @@ def _warmup_mmap_file(path): pass -class MMapIndexedCache(torch.utils.data.Dataset): +class MMapCache: class Index(object): _HDR_MAGIC = b"MMIDIDX\x00\x00" @@ -174,10 +174,11 @@ def __len__(self): @lru_cache(maxsize=8) def __getitem__(self, i): ptr, size = self._index[i] + # self._index.dtype will be different everytime np_array = np.frombuffer(self._bin_buffer, dtype=self._index.dtype, count=size, offset=ptr) if self._index.dtype != np.int64: np_array = np_array.astype(np.int64) - + #To tensor_dict() here. return torch.from_numpy(np_array) @property @@ -189,15 +190,12 @@ def exists(path): return os.path.exists(index_file_path(path)) and os.path.exists(data_file_path(path)) -class MMapIndexedCacheBuilder(object): - def __init__(self, out_file, vocab_size=None): +class MMapCacheBuilder(object): + def __init__(self, out_file): self._data_file = open(out_file, "wb") self._sizes = [] self._field_names = None - if vocab_size is not None and vocab_size < 65500: - self._dtype = np.uint16 - else: - self._dtype = np.int32 + self._dtype = np.int32 def add_instance(self, instance: Instance): tensor_dict = instance.as_tensor_dict() @@ -206,10 +204,8 @@ def add_instance(self, instance: Instance): self._field_names = list(sorted(flattened_dict.keys())) assert self._field_names # TODO: what if some instances have a different set of field names, i.e missing some, for test instances, we don't have supervision..... - # for now we will just write the name of every field next to the data. - - for key, value in flattened_dict.items(): - self.add_item(key, value) + # for now we will just write the name of every field next_field_names + self.add_tensor(key, value) @classmethod def flatten_dict(cls, tensor_dict: Dict, prefix=None): @@ -224,7 +220,7 @@ def flatten_dict(cls, tensor_dict: Dict, prefix=None): raise ValueError("You gave me a MetadataField") return flat_dict - def add_item(self, name, tensor): + def add_tensor(self, name, tensor): np_array = tensor.contiguous().detach().numpy() np_array_b = np_array.tobytes(order="C") name_b = name.encode() @@ -234,7 +230,7 @@ def add_item(self, name, tensor): def merge_file_(self, another_file): # Concatenate index - index = MMapIndexedCache.Index(index_file_path(another_file)) + index = MMapCache.Index(index_file_path(another_file)) assert index.dtype == self._dtype for size in index.sizes: @@ -247,5 +243,70 @@ def merge_file_(self, another_file): def finalize(self, index_file): self._data_file.close() - with MMapIndexedCache.Index.writer(index_file, self._dtype) as index: + with MMapCache.Index.writer(index_file, self._dtype) as index: index.write(self._sizes) + + + +class MMapCache: + def __init__( + self, + cache_path: str, + ) -> None: + self.cache_path = cache_path + self._builder = None + self._cache = None + + if os.path.exists(self.cache_path)): + if self.is_finalized(self.cache_path): + #scenario 2, we can read. + self._cache = MMapCache(self.cache_path) + else: + #scenario 3, another training process is currently writing to it or was interrupted while it was writing. + pass + else: + self._builder = MMapCacheBuilder(self.cache_path) + #scenario 1, we need to write to it. + + def get_instances( + self, + data_path: str, + ) -> Optional[Iterable[Dict[str, DataArray]]]: + #dont need data_path here + assert self._cache + for i in range(len(self._cache)): + yield self._cache[i] + + + def set_instances( + self, + instances: Iterable[Dict[str, DataArray]], + ) -> Iterable[Dict[str, DataArray]]: + assert self._builder: + for instance in instances: + self._builder.add_instance(instance) + return instances + + + + def get_vocabulary(self) -> Optional[Vocabulary]: + pass + + def set_vocabulary(self, vocab: Vocabulary) -> None: + pass + + def finalize(self) -> None: + pass + + @classmethod + def hash_config(cls, config: Params) -> str: + pass + + @classmethod + def is_finalized(cls,path): + return True + + + + # Similar to the DatasetReader class, the Cache class will also have + # getters and setters for WorkerInfo and DistributedInfo. \ No newline at end of file From f782c9c2e012f27f40336396f35715f240295e1b Mon Sep 17 00:00:00 2001 From: Ohad Rubin Date: Sun, 30 Aug 2020 05:05:48 +0300 Subject: [PATCH 3/3] added MMapCache, according to the API --- allennlp/common/mmap.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/allennlp/common/mmap.py b/allennlp/common/mmap.py index d20f425dc13..ef81c756c24 100644 --- a/allennlp/common/mmap.py +++ b/allennlp/common/mmap.py @@ -44,7 +44,7 @@ def _warmup_mmap_file(path): pass -class MMapCache: +class MMapCacheReader: class Index(object): _HDR_MAGIC = b"MMIDIDX\x00\x00" @@ -230,7 +230,7 @@ def add_tensor(self, name, tensor): def merge_file_(self, another_file): # Concatenate index - index = MMapCache.Index(index_file_path(another_file)) + index = MMapCacheReader.Index(index_file_path(another_file)) assert index.dtype == self._dtype for size in index.sizes: @@ -243,7 +243,7 @@ def merge_file_(self, another_file): def finalize(self, index_file): self._data_file.close() - with MMapCache.Index.writer(index_file, self._dtype) as index: + with MMapCacheReader.Index.writer(index_file, self._dtype) as index: index.write(self._sizes) @@ -260,7 +260,7 @@ def __init__( if os.path.exists(self.cache_path)): if self.is_finalized(self.cache_path): #scenario 2, we can read. - self._cache = MMapCache(self.cache_path) + self._cache = MMapCacheReader(self.cache_path) else: #scenario 3, another training process is currently writing to it or was interrupted while it was writing. pass