forked from facebookresearch/CodeGen
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathobfuscation_functions_mode.py
187 lines (169 loc) · 6.2 KB
/
obfuscation_functions_mode.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# Copyright (c) 2019-present, Facebook, Inc.
# All rights reserved.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
#
from itertools import chain
from logging import getLogger
import submitit
from codegen_sources.preprocessing.bpe_modes.bpe_mode import TMP_EXT
from codegen_sources.preprocessing.dataset_modes.dataset_mode import (
DATASET_SPLITS,
DatasetMode,
)
from codegen_sources.preprocessing.lang_processors.lang_processor import LangProcessor
from codegen_sources.preprocessing.obfuscation.utils_deobfuscation import (
REPLACE_DICT,
cleanup_obfuscated_function,
)
from codegen_sources.preprocessing.timeout import timeout
from submitit import Executor, LocalExecutor
OUTLIER_INDICES_THRESHOLDS = {"VAR_": 200, "FUNC_": 200, "CLASS_": 100}
FUNC_OBFUSCATION_SUFFIXES = ["obfuscated_func", "dictionary_func"]
logger = getLogger()
class ObfuscationFunctionsMode(DatasetMode):
"""
Callable where we track the repos processed so that we can checkpoint with submitit
"""
def __init__(
self,
folder,
languages,
bpe,
processed_lines: set = None,
nb_train_split: int = 8,
keep_comments: bool = False,
):
super().__init__(
suffixes=FUNC_OBFUSCATION_SUFFIXES,
folder=folder,
languages=languages,
bpe=bpe,
parallel_dataset=True,
processed_lines=processed_lines,
nb_train_split=nb_train_split,
keep_comments=keep_comments,
)
def checkpoint(
self, input_path: str, process_strings: bool
) -> submitit.helpers.DelayedSubmission:
return submitit.helpers.DelayedSubmission(
self.__class__(
self.folder, self.languages, self.bpe, self.processed_lines,
),
input_path,
process_strings,
)
@timeout(60)
def extract_data_for_line(
self,
line_id: int,
json_line: dict,
process_strings: bool,
lang_processor: LangProcessor,
):
default_return = line_id, None, None
if "content" not in json_line:
return default_return
content = json_line["content"]
for k, v in REPLACE_DICT.items():
content = content.replace(k, v)
try:
obfuscated, dico = lang_processor.obfuscate_code(content)
tokenized_obfuscated_file = " ".join(
lang_processor.tokenize_code(
obfuscated,
process_strings=process_strings,
keep_comments=self.keep_comments,
)
)
except NotImplementedError:
logger.error(
f"Obfuscate method is not implemented for {lang_processor.__class__.__name__}"
)
raise
except KeyboardInterrupt:
raise
except Exception as e:
logger.warning(f"Error obfuscating content {e} \n")
return default_return
obfuscated_functions = []
func_dicos = []
try:
f_standalone, f_class = lang_processor.extract_functions(
tokenized_obfuscated_file
)
functions = f_standalone + f_class
for func in functions:
func, func_dico = cleanup_obfuscated_function(func, dico)
obfuscated_functions.append(func)
func_dicos.append(func_dico)
assert len(obfuscated_functions) == len(func_dicos)
except KeyboardInterrupt:
raise
except Exception as e:
logger.warning(f"error {e} extracting functions\n")
return default_return
return (
line_id,
json_line["repo_name"],
{"obfuscated_func": obfuscated_functions, "dictionary_func": func_dicos},
)
def filter(self, tokenized_data):
assert all(s in tokenized_data for s in self.suffixes)
assert len(tokenized_data["dictionary_func"]) == len(
tokenized_data["obfuscated_func"]
)
for var_prefix, var_number in OUTLIER_INDICES_THRESHOLDS.items():
for dico in tokenized_data["dictionary_func"]:
if f"{var_prefix}{var_number}" in dico:
return True
return False
def _learn_bpe(self, ncodes: int, executor: Executor = None):
raise Exception(
"BPE codes should not be learnt from obfuscated data. Learn them on monolingual data."
"Please provide bpe codes or learn them."
"To do so, please run pipepline with monolingual mode until BPE learning."
)
def apply_bpe(self, executor: Executor = None, local_parallelism: int = None):
"""
Overwrite the method as in the obfuscation mode, need to restore the BPE.
"""
logger.info("")
logger.info("")
logger.info("========== Apply BPE ===========")
if executor is None:
executor = LocalExecutor(folder=self.folder.joinpath("log"))
# apply BPE with tmp suffix
_bpe_ext = self.bpe.ext
self.bpe.ext += TMP_EXT
super().apply_bpe(executor)
self.bpe.ext = _bpe_ext
# restore BPE on obfuscation special tokens
jobs = []
to_restore = list(
chain(
*[
self.folder.glob(f"{lang}.{split}.*{self.bpe.ext}{TMP_EXT}")
for split in DATASET_SPLITS
for lang in self.languages
]
)
)
for f in to_restore:
job = executor.submit(
self.bpe.repair_bpe_for_obfuscation_file, f, f.with_suffix("")
)
jobs.append(job)
for job in jobs:
job.result()
for f in to_restore:
assert f.with_suffix("").is_file()
f.unlink()
def _get_vocab(self, executor: Executor = None):
raise Exception(
"Vocab should not be learnt from obfuscated data. Learn it on monolingual data."
"Please provide vocab or learn them."
"To do so, please run pipepline with monolingual mode until get_vocab."
)