|
13 | 13 | # See the License for the specific language governing permissions and |
14 | 14 | # limitations under the License. |
15 | 15 |
|
16 | | -from typing import Dict, List |
| 16 | +import re |
| 17 | +from typing import Dict, List, Union, cast |
17 | 18 |
|
18 | | -from transformers.tokenization_utils_base import PreTrainedTokenizerBase |
| 19 | +import numpy as np |
| 20 | +from transformers import AutoTokenizer, PreTrainedTokenizerBase |
| 21 | + |
| 22 | +lorem_text = ( |
| 23 | + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor " |
| 24 | + "incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis " |
| 25 | + "nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. " |
| 26 | + "Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore " |
| 27 | + "eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt " |
| 28 | + "in culpa qui officia deserunt mollit anim id est laborum." |
| 29 | +) |
| 30 | +words = np.array(list(set(re.findall(r"\b[a-zA-Z]+\b", lorem_text)))) |
19 | 31 |
|
20 | 32 |
|
21 | 33 | def texts_to_hashes( |
22 | | - tokenizer: PreTrainedTokenizerBase, texts: List[str], block_size: int = 512 |
| 34 | + tokenizer: Union[str, PreTrainedTokenizerBase], |
| 35 | + texts: List[str], |
| 36 | + block_size: int = 512, |
23 | 37 | ) -> List[List[int]]: |
24 | 38 | """ |
25 | 39 | Tokenizes a list of strings (without special tokens), splits tokens into blocks, |
26 | 40 | computes rolling hashes, and returns a list of lists of integer-mapped rolling hashes |
27 | 41 | for each input string. |
28 | 42 |
|
29 | 43 | Args: |
30 | | - tokenizer: Tokenizer object with a .encode method. |
| 44 | + tokenizer: Tokenizer object with a .encode method or string name to load from HuggingFace. |
31 | 45 | texts (List[str]): List of input strings. |
32 | 46 | block_size (int): Size of each token block for hashing. |
33 | 47 |
|
34 | 48 | Returns: |
35 | 49 | List[List[int]]: List of lists of integer-mapped rolling hashes for each block of each input string. |
36 | 50 | """ |
| 51 | + # Load tokenizer if string is provided |
| 52 | + if isinstance(tokenizer, str): |
| 53 | + tokenizer = cast( |
| 54 | + PreTrainedTokenizerBase, AutoTokenizer.from_pretrained(tokenizer) |
| 55 | + ) |
| 56 | + |
37 | 57 | # Batch tokenize for efficiency |
38 | 58 | batch_encoding = tokenizer( |
39 | 59 | texts, |
@@ -71,3 +91,76 @@ def texts_to_hashes( |
71 | 91 | results.append(hashes) |
72 | 92 |
|
73 | 93 | return results |
| 94 | + |
| 95 | + |
| 96 | +def hashes_to_texts( |
| 97 | + tokenizer: Union[str, PreTrainedTokenizerBase], |
| 98 | + hash_ids_list: List[List[int]], |
| 99 | + input_lengths: List[int], |
| 100 | + block_size: int = 512, |
| 101 | +) -> List[str]: |
| 102 | + """ |
| 103 | + Converts a list of hash ID sequences back to text strings using a global token mapping. |
| 104 | +
|
| 105 | + Args: |
| 106 | + tokenizer: Tokenizer object with a .decode method or string name to load from HuggingFace. |
| 107 | + hash_ids_list (List[List[int]]): List of hash ID sequences for each input. |
| 108 | + input_lengths (List[int]): Target input lengths for each sequence. |
| 109 | + block_size (int): Size of each token block for reconstruction. |
| 110 | +
|
| 111 | + Returns: |
| 112 | + List[str]: List of reconstructed text strings. |
| 113 | + """ |
| 114 | + # Load tokenizer if string is provided |
| 115 | + if isinstance(tokenizer, str): |
| 116 | + tokenizer = cast( |
| 117 | + PreTrainedTokenizerBase, AutoTokenizer.from_pretrained(tokenizer) |
| 118 | + ) |
| 119 | + |
| 120 | + results: List[str] = [] |
| 121 | + _hash_id_to_tokens: Dict[int, np.ndarray] = {} |
| 122 | + |
| 123 | + for hash_ids, input_len in zip(hash_ids_list, input_lengths): |
| 124 | + # Verify constraint: len(hash_ids) * block_size <= input_len |
| 125 | + if len(hash_ids) * block_size < input_len: |
| 126 | + raise ValueError( |
| 127 | + f"Constraint violation: len(hash_ids) * block_size ({len(hash_ids) * block_size}) > input_len ({input_len})" |
| 128 | + ) |
| 129 | + |
| 130 | + token_arrays: List[np.ndarray] = [] |
| 131 | + |
| 132 | + for i, hash_id in enumerate(hash_ids): |
| 133 | + # Determine the block size for this hash_id |
| 134 | + remaining_tokens = input_len - sum(len(arr) for arr in token_arrays) |
| 135 | + current_block_size = min(block_size, remaining_tokens) |
| 136 | + |
| 137 | + if current_block_size <= 0: |
| 138 | + break |
| 139 | + |
| 140 | + # Check if hash_id already exists in global dict |
| 141 | + if hash_id in _hash_id_to_tokens: |
| 142 | + # Use existing array, but assert it matches current_block_size |
| 143 | + existing_array = _hash_id_to_tokens[hash_id] |
| 144 | + assert ( |
| 145 | + len(existing_array) == current_block_size |
| 146 | + ), f"Existing array length {len(existing_array)} does not match current block size {current_block_size}" |
| 147 | + token_array = existing_array |
| 148 | + else: |
| 149 | + # Generate new random array by sampling words, tokenizing, and taking first tokens |
| 150 | + sampled_words = np.random.choice(words, size=current_block_size) |
| 151 | + sampled_text = " ".join(sampled_words) |
| 152 | + tokens = tokenizer.encode(sampled_text, add_special_tokens=False) |
| 153 | + token_array = np.array(tokens[:current_block_size], dtype=np.int32) |
| 154 | + if getattr(tokenizer, "bos_token_id", None) is not None: |
| 155 | + token_array[0] = tokenizer.bos_token_id |
| 156 | + _hash_id_to_tokens[hash_id] = token_array |
| 157 | + |
| 158 | + token_arrays.append(token_array) |
| 159 | + |
| 160 | + all_tokens = np.concatenate(token_arrays) |
| 161 | + |
| 162 | + # Decode to text |
| 163 | + text = tokenizer.decode(all_tokens, skip_special_tokens=False) |
| 164 | + results.append(text) |
| 165 | + |
| 166 | + return results |
0 commit comments